Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Add Hrana 3 (#548)
Browse files Browse the repository at this point in the history
* Sketch the Hrana 3 spec

* Sketch Protobuf encoding of shared Hrana structures

* Sketch Hrana 3 over WebSocket (untested)

* Change `get_state` Hrana request to `get_autocommit`

* Add Protobuf encoding for Hrana over HTTP

* Add cursors to Hrana over HTTP

* Fixup after rebase

* Make clippy happy

* Changes from code review
  • Loading branch information
honzasp authored Aug 23, 2023
1 parent f19ffe2 commit 10b0591
Show file tree
Hide file tree
Showing 28 changed files with 3,712 additions and 296 deletions.
1,710 changes: 1,710 additions & 0 deletions docs/HRANA_3_SPEC.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions sqld/proto/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message Cond {
NotCond not = 3;
AndCond and = 4;
OrCond or = 5;
IsAutocommitCond is_autocommit = 6;
}
}

Expand All @@ -123,6 +124,9 @@ message OrCond {
repeated Cond conds = 1;
}

message IsAutocommitCond {
}

enum Authorized {
READONLY = 0;
FULL = 1;
Expand Down
39 changes: 29 additions & 10 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ impl<'a> Connection<'a> {
builder: &mut impl QueryResultBuilder,
) -> Result<bool> {
builder.begin_step()?;

let mut enabled = match step.cond.as_ref() {
Some(cond) => match eval_cond(cond, results) {
Some(cond) => match eval_cond(cond, results, self.is_autocommit()) {
Ok(enabled) => enabled,
Err(e) => {
builder.step_error(e).unwrap();
Expand Down Expand Up @@ -453,25 +454,29 @@ impl<'a> Connection<'a> {
is_readonly,
})
}

fn is_autocommit(&self) -> bool {
self.conn.is_autocommit()
}
}

fn eval_cond(cond: &Cond, results: &[bool]) -> Result<bool> {
fn eval_cond(cond: &Cond, results: &[bool], is_autocommit: bool) -> Result<bool> {
let get_step_res = |step: usize| -> Result<bool> {
let res = results.get(step).ok_or(Error::InvalidBatchStep(step))?;

Ok(*res)
};

Ok(match cond {
Cond::Ok { step } => get_step_res(*step)?,
Cond::Err { step } => !get_step_res(*step)?,
Cond::Not { cond } => !eval_cond(cond, results)?,
Cond::And { conds } => conds
.iter()
.try_fold(true, |x, cond| eval_cond(cond, results).map(|y| x & y))?,
Cond::Or { conds } => conds
.iter()
.try_fold(false, |x, cond| eval_cond(cond, results).map(|y| x | y))?,
Cond::Not { cond } => !eval_cond(cond, results, is_autocommit)?,
Cond::And { conds } => conds.iter().try_fold(true, |x, cond| {
eval_cond(cond, results, is_autocommit).map(|y| x & y)
})?,
Cond::Or { conds } => conds.iter().try_fold(false, |x, cond| {
eval_cond(cond, results, is_autocommit).map(|y| x | y)
})?,
Cond::IsAutocommit => is_autocommit,
})
}

Expand Down Expand Up @@ -558,6 +563,20 @@ impl super::Connection for LibSqlConnection {

Ok(receiver.await?)
}

async fn is_autocommit(&self) -> Result<bool> {
let (resp, receiver) = oneshot::channel();
let cb = Box::new(move |maybe_conn: Result<&mut Connection>| {
let res = maybe_conn.map(|c| c.is_autocommit());
if resp.send(res).is_err() {
anyhow::bail!("connection closed");
}
Ok(())
});

let _: Result<_, _> = self.sender.send(cb);
receiver.await?
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions sqld/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub trait Connection: Send + Sync + 'static {

/// Parse the SQL statement and return information about it.
async fn describe(&self, sql: String, auth: Authenticated) -> Result<DescribeResult>;

/// Check whether the connection is in autocommit mode.
async fn is_autocommit(&self) -> Result<bool>;
}

fn make_batch_program(batch: Vec<Query>) -> Vec<Step> {
Expand Down Expand Up @@ -273,6 +276,11 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
async fn describe(&self, sql: String, auth: Authenticated) -> crate::Result<DescribeResult> {
self.inner.describe(sql, auth).await
}

#[inline]
async fn is_autocommit(&self) -> crate::Result<bool> {
self.inner.is_autocommit().await
}
}

#[cfg(test)]
Expand All @@ -299,6 +307,10 @@ mod test {
) -> crate::Result<DescribeResult> {
unreachable!()
}

async fn is_autocommit(&self) -> crate::Result<bool> {
unreachable!()
}
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions sqld/src/connection/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum Cond {
Not { cond: Box<Self> },
Or { conds: Vec<Self> },
And { conds: Vec<Self> },
IsAutocommit,
}

pub type DescribeResult = crate::Result<DescribeResponse>;
Expand Down
8 changes: 8 additions & 0 deletions sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ impl Connection for WriteProxyConnection {
self.wait_replication_sync().await?;
self.read_db.describe(sql, auth).await
}

async fn is_autocommit(&self) -> Result<bool> {
let state = self.state.lock().await;
Ok(match *state {
State::Txn => false,
State::Init | State::Invalid => true,
})
}
}

impl Drop for WriteProxyConnection {
Expand Down
51 changes: 38 additions & 13 deletions sqld/src/hrana/batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -27,8 +27,12 @@ pub enum BatchError {
ResponseTooLarge,
}

fn proto_cond_to_cond(cond: &proto::BatchCond, max_step_i: usize) -> Result<Cond> {
let try_convert_step = |step: i32| -> Result<usize, ProtocolError> {
fn proto_cond_to_cond(
cond: &proto::BatchCond,
version: Version,
max_step_i: usize,
) -> Result<Cond> {
let try_convert_step = |step: u32| -> Result<usize, ProtocolError> {
let step = usize::try_from(step).map_err(|_| ProtocolError::BatchCondBadStep)?;
if step >= max_step_i {
return Err(ProtocolError::BatchCondBadStep);
Expand All @@ -37,27 +41,41 @@ fn proto_cond_to_cond(cond: &proto::BatchCond, max_step_i: usize) -> Result<Cond
};

let cond = match cond {
proto::BatchCond::None => {
bail!(ProtocolError::NoneBatchCond)
}
proto::BatchCond::Ok { step } => Cond::Ok {
step: try_convert_step(*step)?,
},
proto::BatchCond::Error { step } => Cond::Err {
step: try_convert_step(*step)?,
},
proto::BatchCond::Not { cond } => Cond::Not {
cond: proto_cond_to_cond(cond, max_step_i)?.into(),
cond: proto_cond_to_cond(cond, version, max_step_i)?.into(),
},
proto::BatchCond::And { conds } => Cond::And {
conds: conds
proto::BatchCond::And(cond_list) => Cond::And {
conds: cond_list
.conds
.iter()
.map(|cond| proto_cond_to_cond(cond, max_step_i))
.map(|cond| proto_cond_to_cond(cond, version, max_step_i))
.collect::<Result<_>>()?,
},
proto::BatchCond::Or { conds } => Cond::Or {
conds: conds
proto::BatchCond::Or(cond_list) => Cond::Or {
conds: cond_list
.conds
.iter()
.map(|cond| proto_cond_to_cond(cond, max_step_i))
.map(|cond| proto_cond_to_cond(cond, version, max_step_i))
.collect::<Result<_>>()?,
},
proto::BatchCond::IsAutocommit {} => {
if version < Version::Hrana3 {
bail!(ProtocolError::NotSupported {
what: "BatchCond of type `is_autocommit`",
min_version: Version::Hrana3,
})
}
Cond::IsAutocommit
}
};

Ok(cond)
Expand All @@ -74,7 +92,7 @@ pub fn proto_batch_to_program(
let cond = step
.condition
.as_ref()
.map(|cond| proto_cond_to_cond(cond, step_i))
.map(|cond| proto_cond_to_cond(cond, version, step_i))
.transpose()?;
let step = Step { query, cond };

Expand Down Expand Up @@ -149,12 +167,12 @@ pub async fn execute_sequence(

fn catch_batch_error(sqld_error: SqldError) -> anyhow::Error {
match batch_error_from_sqld_error(sqld_error) {
Ok(stmt_error) => anyhow!(stmt_error),
Ok(batch_error) => anyhow!(batch_error),
Err(sqld_error) => anyhow!(sqld_error),
}
}

fn batch_error_from_sqld_error(sqld_error: SqldError) -> Result<BatchError, SqldError> {
pub fn batch_error_from_sqld_error(sqld_error: SqldError) -> Result<BatchError, SqldError> {
Ok(match sqld_error {
SqldError::LibSqlTxTimeout => BatchError::TransactionTimeout,
SqldError::LibSqlTxBusy => BatchError::TransactionBusy,
Expand All @@ -165,6 +183,13 @@ fn batch_error_from_sqld_error(sqld_error: SqldError) -> Result<BatchError, Sqld
})
}

pub fn proto_error_from_batch_error(error: &BatchError) -> proto::Error {
proto::Error {
message: error.to_string(),
code: error.code().into(),
}
}

impl BatchError {
pub fn code(&self) -> &'static str {
match self {
Expand Down
Loading

0 comments on commit 10b0591

Please sign in to comment.