Skip to content

Commit

Permalink
Support basic features.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Nov 29, 2019
1 parent c677f3f commit 7ced3c6
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 294 deletions.
7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ description = "rsocket-rust is an implementation of the RSocket protocol in Rust
[dependencies]
matches = "0.1.8"
log = "0.4.8"
bytes = "0.5.1"
bytes = "0.5.2"
futures = "0.3.1"
lazy_static = "1.4.0"
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
Expand All @@ -27,7 +27,6 @@ version = "0.2.0"
default-features = false
features = ["codec"]


[dev-dependencies]
env_logger = "0.7.1"
hex = "0.4.0"
Expand All @@ -37,10 +36,6 @@ rand = "0.7.2"
name = "echo"
path = "examples/echo/main.rs"

[[example]]
name = "echo_client"
path = "examples/echo_client/main.rs"

# [[example]]
# name = "proxy"
# path = "examples/proxy/main.rs"
85 changes: 38 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![License](https://img.shields.io/github/license/rsocket/rsocket-rust.svg)](https://github.com/rsocket/rsocket-rust/blob/master/LICENSE)
[![GitHub Release](https://img.shields.io/github/release-pre/rsocket/rsocket-rust.svg)](https://github.com/rsocket/rsocket-rust/releases)

> rsocket-rust is an implementation of the RSocket protocol in Rust.
> rsocket-rust is an implementation of the RSocket protocol in Rust(1.39+).
It's an **alpha** version and still under active development. **Do not use it in a production environment!**

## Example
Expand All @@ -15,66 +15,57 @@ It's an **alpha** version and still under active development. **Do not use it in
### Server

```rust
extern crate bytes;
extern crate futures;
extern crate rsocket_rust;

use bytes::Bytes;
use futures::prelude::*;
extern crate tokio;
#[macro_use]
extern crate log;
use rsocket_rust::prelude::*;

#[test]
fn test_serve() {
RSocketFactory::receive()
.transport(URI::Tcp("127.0.0.1:7878"))
.acceptor(|setup, sending_socket| {
println!("accept setup: {:?}", setup);
// TODO: use tokio runtime?
std::thread::spawn(move || {
let resp = sending_socket
.request_response(
Payload::builder()
.set_data(Bytes::from("Hello Client!"))
.build(),
)
.wait()
.unwrap();
println!(">>>>> response success: {:?}", resp);
});
Box::new(MockResponder)
})
.serve()
.wait()
.unwrap();
use std::env;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::builder().init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());

RSocketFactory::receive()
.transport(URI::Tcp(addr))
.acceptor(|setup, _socket| {
info!("accept setup: {:?}", setup);
Box::new(EchoRSocket)
})
.serve()
.await
}

```

### Client

```rust
extern crate futures;
extern crate rsocket_rust;

use futures::prelude::*;
use rsocket_rust::prelude::*;

#[tokio::main]
#[test]
fn test_client() {
let cli = RSocketFactory::connect()
.acceptor(||Box::new(MockResponder))
.transport(URI::Tcp("127.0.0.1:7878"))
.setup(Payload::from("READY!"))
.mime_type("text/plain", "text/plain")
.start()
.unwrap();
let pa = Payload::builder()
.set_data_utf8("Hello World!")
.set_metadata_utf8("Rust!")
.build();
let resp = cli.request_response(pa).wait().unwrap();
println!("******* response: {:?}", resp);
async fn test() {
let cli = RSocketFactory::connect()
.acceptor(|| Box::new(EchoRSocket))
.transport(URI::Tcp("127.0.0.1:7878".to_string()))
.setup(Payload::from("READY!"))
.mime_type("text/plain", "text/plain")
.start()
.await
.unwrap();
let req = Payload::builder()
.set_data_utf8("Hello World!")
.set_metadata_utf8("Rust")
.build();
let res = cli.request_response(req).await.unwrap();
println!("got: {:?}", res);
cli.close();
}

```

## Dependencies
Expand Down
22 changes: 1 addition & 21 deletions examples/echo/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

RSocketFactory::receive()
.transport(URI::Tcp(addr))
.acceptor(|setup, sending_socket| {
.acceptor(|setup, _socket| {
info!("accept setup: {:?}", setup);
Box::new(EchoRSocket)
})
.serve()
.await

// let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
// let mut listener = TcpListener::bind(&addr).await?;
// println!("Listening on: {}", addr);

// loop {
// let (mut socket, _) = listener.accept().await?;
// let (rcv_tx, mut rcv_rx) = mpsc::unbounded_channel::<Frame>();
// let (snd_tx, snd_rx) = mpsc::unbounded_channel::<Frame>();

// tokio::spawn(
// async move { rsocket_rust::transport::tcp::process(socket, snd_rx, rcv_tx).await },
// );

// let ds = DuplexSocket::new(0, snd_tx.clone());
// tokio::spawn(async move {
// let acceptor = Acceptor::Generate(Arc::new(|setup, socket| Box::new(EchoRSocket)));
// ds.event_loop(acceptor, rcv_rx).await;
// });
// }
}
21 changes: 0 additions & 21 deletions examples/echo_client/main.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/frame/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub struct U24;

impl U24 {
pub fn max() -> usize {
0x00FFFFFF
0x00FF_FFFF
}

pub fn write(n: u32, bf: &mut BytesMut) {
Expand Down
30 changes: 18 additions & 12 deletions src/mime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,51 +201,57 @@ impl WellKnownMIME {

pub fn raw(&self) -> u8 {
let (v, _) = MIME_MAP.get(self).unwrap();
v.clone()
*v
}
}

impl From<String> for WellKnownMIME {
fn from(s: String) -> WellKnownMIME {
let mut result = WellKnownMIME::Unknown;
for (k, v) in MIME_MAP.iter() {
let (_, vv) = v;
if vv.to_string() == s {
return k.clone();
if *vv == s {
result = k.clone();
break;
}
}
return WellKnownMIME::Unknown;
result
}
}

impl From<&str> for WellKnownMIME {
fn from(s: &str) -> WellKnownMIME {
let mut result = WellKnownMIME::Unknown;
for (k, v) in MIME_MAP.iter() {
let (_, vv) = v;
if vv.to_string() == s.to_string() {
return k.clone();
if *vv == s {
result = k.clone();
break;
}
}
return WellKnownMIME::Unknown;
result
}
}

impl From<u8> for WellKnownMIME {
fn from(n: u8) -> WellKnownMIME {
let mut result = WellKnownMIME::Unknown;
for (k, v) in MIME_MAP.iter() {
let (a, _) = v;
if *a == n {
return k.clone();
result = k.clone();
break;
}
}
return WellKnownMIME::Unknown;
result
}
}

impl fmt::Display for WellKnownMIME {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
return match MIME_MAP.get(self) {
match MIME_MAP.get(self) {
Some((_, v)) => write!(f, "{}", v),
None => write!(f, "{}", "unknown"),
};
None => write!(f, "unknown"),
}
}
}
2 changes: 1 addition & 1 deletion src/payload/default.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::frame;
use bytes::Bytes;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Payload {
m: Option<Bytes>,
d: Option<Bytes>,
Expand Down
1 change: 1 addition & 0 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub use crate::payload::*;
pub use crate::result::*;
pub use crate::spi::*;
pub use crate::x::{Client, RSocketFactory, URI};
pub use futures::{Sink, SinkExt, Stream, StreamExt};
49 changes: 32 additions & 17 deletions src/spi.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
use crate::{
errors::{ErrorKind, RSocketError},
frame,
payload::{Payload, SetupPayload},
result::RSocketResult,
};
use crate::errors::{ErrorKind, RSocketError};
use crate::frame;
use crate::payload::{Payload, SetupPayload};
use crate::result::RSocketResult;
use futures::future;
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

pub type Single<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
// TODO: switch to reactor-rust.
pub type Mono<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
pub type Flux<T> = Pin<Box<dyn Send + Sync + Stream<Item = RSocketResult<T>>>>;

pub trait RSocket: Sync + Send {
fn metadata_push(&self, req: Payload) -> Single<()>;
fn fire_and_forget(&self, req: Payload) -> Single<()>;
fn request_response(&self, req: Payload) -> Single<Payload>;
fn metadata_push(&self, req: Payload) -> Mono<()>;
fn fire_and_forget(&self, req: Payload) -> Mono<()>;
fn request_response(&self, req: Payload) -> Mono<Payload>;
fn request_stream(&self, req: Payload) -> Flux<Payload>;
// fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
}

pub struct EchoRSocket;

impl RSocket for EchoRSocket {
fn metadata_push(&self, req: Payload) -> Single<()> {
fn metadata_push(&self, req: Payload) -> Mono<()> {
info!("echo metadata_push: {:?}", req);
Box::pin(future::ok::<(), RSocketError>(()))
}
fn fire_and_forget(&self, req: Payload) -> Single<()> {
fn fire_and_forget(&self, req: Payload) -> Mono<()> {
info!("echo fire_and_forget: {:?}", req);
Box::pin(future::ok::<(), RSocketError>(()))
}

fn request_response(&self, req: Payload) -> Single<Payload> {
fn request_response(&self, req: Payload) -> Mono<Payload> {
info!("echo request_response: {:?}", req);
Box::pin(future::ok::<Payload, RSocketError>(req))
}
fn request_stream(&self, req: Payload) -> Flux<Payload> {
info!("echo request_stream: {:?}", req);
Box::pin(futures::stream::iter(vec![
Ok(req.clone()),
Ok(req.clone()),
Ok(req),
]))
}
}

pub struct EmptyRSocket;
Expand All @@ -43,17 +54,21 @@ impl EmptyRSocket {
}

impl RSocket for EmptyRSocket {
fn metadata_push(&self, _req: Payload) -> Single<()> {
fn metadata_push(&self, _req: Payload) -> Mono<()> {
Box::pin(future::err(self.must_failed()))
}

fn fire_and_forget(&self, _req: Payload) -> Single<()> {
fn fire_and_forget(&self, _req: Payload) -> Mono<()> {
Box::pin(future::err(self.must_failed()))
}

fn request_response(&self, _req: Payload) -> Single<Payload> {
fn request_response(&self, _req: Payload) -> Mono<Payload> {
Box::pin(future::err(self.must_failed()))
}

fn request_stream(&self, req: Payload) -> Flux<Payload> {
Box::pin(futures::stream::empty())
}
}

pub type AcceptorGenerator = Arc<fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>>;
Expand Down
Loading

0 comments on commit 7ced3c6

Please sign in to comment.