Skip to content

Commit

Permalink
feat: add pre signed read state call wait (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolasHai authored Feb 7, 2024
1 parent c20ba0c commit 6330d29
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 80 additions & 6 deletions ic-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
to_request_id, RequestId,
};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use backoff::{exponential::ExponentialBackoff, SystemClock};
use ic_certification::{Certificate, Delegation, Label};
use ic_transport_types::{
signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
Expand Down Expand Up @@ -656,18 +657,91 @@ impl Agent {
}
}

fn get_retry_policy() -> ExponentialBackoff<SystemClock> {
ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(500))
.with_max_interval(Duration::from_secs(1))
.with_multiplier(1.4)
.with_max_elapsed_time(Some(Duration::from_secs(60 * 5)))
.build()
}

/// Wait for request_status to return a Replied response and return the arg.
pub async fn wait_signed(
&self,
request_id: &RequestId,
effective_canister_id: Principal,
signed_request_status: Vec<u8>,
) -> Result<Vec<u8>, AgentError> {
let mut retry_policy = Self::get_retry_policy();

let mut request_accepted = false;
loop {
match self
.request_status_signed(
request_id,
effective_canister_id,
signed_request_status.clone(),
)
.await?
{
RequestStatusResponse::Unknown => {}

RequestStatusResponse::Received | RequestStatusResponse::Processing => {
if !request_accepted {
retry_policy.reset();
request_accepted = true;
}
}

RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => return Ok(arg),

RequestStatusResponse::Rejected(response) => {
return Err(AgentError::ReplicaError(response))
}

RequestStatusResponse::Done => {
return Err(AgentError::RequestStatusDoneNoReply(String::from(
*request_id,
)))
}
};

match retry_policy.next_backoff() {
#[cfg(not(target_family = "wasm"))]
Some(duration) => tokio::time::sleep(duration).await,

#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
Some(duration) => {
wasm_bindgen_futures::JsFuture::from(js_sys::Promise::new(&mut |rs, rj| {
if let Err(e) = web_sys::window()
.expect("global window unavailable")
.set_timeout_with_callback_and_timeout_and_arguments_0(
&rs,
duration.as_millis() as _,
)
{
use wasm_bindgen::UnwrapThrowExt;
rj.call1(&rj, &e).unwrap_throw();
}
}))
.await
.expect("unable to setTimeout");
}

None => return Err(AgentError::TimeoutWaitingForResponse()),
}
}
}

/// Call request_status on the RequestId in a loop and return the response as a byte vector.
pub async fn wait(
&self,
request_id: RequestId,
effective_canister_id: Principal,
) -> Result<Vec<u8>, AgentError> {
let mut retry_policy = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(500))
.with_max_interval(Duration::from_secs(1))
.with_multiplier(1.4)
.with_max_elapsed_time(Some(Duration::from_secs(60 * 5)))
.build();
let mut retry_policy = Self::get_retry_policy();

let mut request_accepted = false;
loop {
match self.poll(&request_id, effective_canister_id).await? {
Expand Down
1 change: 1 addition & 0 deletions ref-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
serde_cbor = { workspace = true }
ic-certification = { workspace = true }
87 changes: 86 additions & 1 deletion ref-tests/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
//! integration tests with a running IC-Ref.
use candid::CandidType;
use ic_agent::{
agent::{agent_error::HttpErrorPayload, RejectCode, RejectResponse},
agent::{agent_error::HttpErrorPayload, Envelope, EnvelopeContent, RejectCode, RejectResponse},
export::Principal,
AgentError, Identity,
};
use ic_certification::Label;
use ic_utils::{
call::{AsyncCall, SyncCall},
interfaces::{
Expand All @@ -21,6 +22,12 @@ use ref_tests::{
get_wallet_wasm_from_env, universal_canister::payload, with_universal_canister,
with_wallet_canister,
};
use serde::Serialize;
use std::{
borrow::Cow,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

#[ignore]
#[test]
Expand Down Expand Up @@ -64,6 +71,84 @@ fn basic_expiry() {
})
}

#[ignore]
#[test]
fn wait_signed() {
with_universal_canister(|mut agent, canister_id| async move {
fn serialized_bytes(envelope: Envelope) -> Vec<u8> {
let mut serialized_bytes = Vec::new();
let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
serializer.self_describe().unwrap();
envelope.serialize(&mut serializer).unwrap();
serialized_bytes
}

let arg = payload().reply_data(b"hello").build();
let ingress_expiry = (SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
+ Duration::from_secs(120))
.as_nanos() as u64;

let agent_identity = Arc::new(create_basic_identity().unwrap());
agent.set_arc_identity(agent_identity.clone());

let call_envelope_content = EnvelopeContent::Call {
sender: agent.get_principal().unwrap(),
arg: arg.clone(),
ingress_expiry,
nonce: None,
canister_id,
method_name: "update".to_string(),
};

let call_request_id = call_envelope_content.to_request_id();
let call_signature = agent_identity.sign(&call_envelope_content).unwrap();

let call_envelope = Envelope {
content: Cow::Borrowed(&call_envelope_content),
sender_pubkey: call_signature.public_key,
sender_sig: call_signature.signature,
sender_delegation: call_signature.delegations,
};

let call_envelope_serialized = serialized_bytes(call_envelope);

agent
.update_signed(canister_id, call_envelope_serialized)
.await
.unwrap();

let paths: Vec<Vec<Label>> = vec![vec![
"request_status".into(),
call_request_id.to_vec().into(),
]];
let read_state_envelope_content = EnvelopeContent::ReadState {
sender: agent.get_principal().unwrap(),
paths,
ingress_expiry,
};

let read_signature = agent_identity.sign(&read_state_envelope_content).unwrap();

let read_state_envelope = Envelope {
content: Cow::Borrowed(&read_state_envelope_content),
sender_pubkey: read_signature.public_key,
sender_sig: read_signature.signature,
sender_delegation: read_signature.delegations,
};

let read_envelope_serialized = serialized_bytes(read_state_envelope);

let result = agent
.wait_signed(&call_request_id, canister_id, read_envelope_serialized)
.await
.unwrap();

assert_eq!(result.as_slice(), b"hello");

Ok(())
})
}

#[ignore]
#[test]
fn canister_query() {
Expand Down

0 comments on commit 6330d29

Please sign in to comment.