Skip to content

Commit

Permalink
[wip] refactor(iroh-gossip)!: dispatch gossip events and updates by t…
Browse files Browse the repository at this point in the history
…opic (#2570)

## Description

This PR changes the main public `iroh_gossip` to keep track of
client-side gossip subscriptions. The `net::Gossip` struct now keeps
track of client-side subscribers per topic, which are made up of a pair
of two streams/channels: from the client to the actor a stream of
updates (outgoing messages) and from the actor to the client a stream of
events (incoming messages). Once all client streams&sinks for a topic
are dropped, the topic is being quit.

This builds on the client API added in #2258, but completely removes the
`dispatcher` module, integrating its features directly into the gossip
actor. See below for a short list of the API changes. The new API can be
browsed
[here](https://n0-computer.github.io/iroh/pr/2570/docs/iroh/gossip/net/index.html).

The refactor turned out bigger than initially intended, sorry for that,
but I did not see a good way to reduce the scope.

What's still missing (can also be follow-ups)?:

- [ ] Review the new public API
- [ ] Align the client API to the iroh_gossip API. The `GossipTopic` can
be made to work on both the client and the native API, as it only deals
with streams and sinks.

## Breaking Changes

* `iroh_gossip::dispatcher` is removed with everything that was in it.
use the new API from `iroh_gossip::net::Gossip` instead (see below).
* `iroh_gossip::net::Gossip` methods changed:
  * changed: `join` now returns a `GossipTopic`
* removed: `broadcast`, `broadcast_neighbors`, `subscribe`,
`subscribe_all`, `quit`.
* for `subscribe` use `join` instead, which returns a `GossipTopic`
* for `broadcast` and `broadcast_neighbors` use the respective methods
on `GossipTopic` .
* `quit` is obsolete now, the topic will be quitted once all
`GossipTopic` handles are dropped.
      * `subscribe_all` is no longer available
* `iroh_gossip::net::JoinTopicFut` is removed (is now obsolete)

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
  • Loading branch information
Frando authored Aug 5, 2024
1 parent 55836fa commit bdc1c45
Show file tree
Hide file tree
Showing 19 changed files with 971 additions and 1,140 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions iroh-cli/src/commands/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,22 @@ impl GossipCommands {
line = input_lines.next_line() => {
let line = line.context("failed to read from stdin")?;
if let Some(line) = line {
sink.send(iroh_gossip::dispatcher::Command::Broadcast(line.into())).await?;
sink.send(iroh_gossip::net::Command::Broadcast(line.into())).await?;
} else {
break;
}
}
res = stream.next() => {
let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
match res {
iroh_gossip::dispatcher::Event::Gossip(event) => {
iroh_gossip::net::Event::Gossip(event) => {
if verbose {
println!("{:?}", event);
} else if let iroh_gossip::dispatcher::GossipEvent::Received(iroh_gossip::dispatcher::Message { content, .. }) = event {
} else if let iroh_gossip::net::GossipEvent::Received(iroh_gossip::net::Message { content, .. }) = event {
println!("{:?}", content);
}
}
iroh_gossip::dispatcher::Event::Lagged => {
iroh_gossip::net::Event::Lagged => {
anyhow::bail!("gossip stream lagged");
}
};
Expand Down
11 changes: 1 addition & 10 deletions iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use tracing::{error, error_span, Instrument};
use crate::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId};
use crate::{Author, AuthorId};

use self::gossip::GossipActor;
use self::live::{LiveActor, ToLiveActor};

pub use self::live::SyncEvent;
Expand Down Expand Up @@ -69,7 +68,6 @@ impl Engine {
default_author_storage: DefaultAuthorStorage,
) -> anyhow::Result<Self> {
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let (to_gossip_actor, to_gossip_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let me = endpoint.node_id().fmt_short();

let content_status_cb = {
Expand All @@ -86,17 +84,10 @@ impl Engine {
downloader,
to_live_actor_recv,
live_actor_tx.clone(),
to_gossip_actor,
);
let gossip_actor = GossipActor::new(
to_gossip_actor_recv,
sync.clone(),
gossip,
live_actor_tx.clone(),
);
let actor_handle = tokio::task::spawn(
async move {
if let Err(err) = actor.run(gossip_actor).await {
if let Err(err) = actor.run().await {
error!("sync actor failed: {err:?}");
}
}
Expand Down
Loading

0 comments on commit bdc1c45

Please sign in to comment.