First working example of an RPC connection

develop
Katharina Fey 3 years ago
parent 52e9882658
commit 189dec7785
Signed by: kookie
GPG Key ID: 90734A9E619C8A6C
  1. 9
      Cargo.lock
  2. 18
      docs/developer/src/social/contributions.md
  3. 28
      docs/user/src/irdest-hubd/config.md
  4. 18
      docs/user/src/nix.md
  5. 9
      irdest-core/src/rpc.rs
  6. 3
      irdest-core/types/src/rpc/mod.rs
  7. 3
      irdest-core/types/src/services.rs
  8. 7
      rpc-core/irpc-broker/src/lib.rs
  9. 10
      rpc-core/irpc-broker/src/map.rs
  10. 15
      rpc-core/irpc-broker/src/proto.rs
  11. 2
      rpc-core/irpc-sdk/src/error.rs
  12. 13
      rpc-core/irpc-sdk/src/io.rs
  13. 18
      rpc-core/irpc-sdk/src/subs.rs
  14. 11
      sdk/irdest-sdk/Cargo.toml
  15. 116
      sdk/irdest-sdk/src/lib.rs
  16. 29
      sdk/irdest-sdk/tests/basic.rs
  17. 90
      sdk/irdest-sdk/tests/harness.rs
  18. 6
      services/ping/src/main.rs

9
Cargo.lock generated

@ -11,7 +11,7 @@ dependencies = [
[[package]]
name = "alexandria"
version = "0.2.1"
version = "0.2.0"
dependencies = [
"alexandria-tags",
"async-std",
@ -1358,8 +1358,15 @@ dependencies = [
name = "irdest-sdk"
version = "0.1.0"
dependencies = [
"alexandria-tags",
"async-std",
"ircore-types",
"irdest-core",
"irpc-broker",
"irpc-sdk",
"ratman-harness",
"tracing",
"tracing-subscriber",
]
[[package]]

@ -33,3 +33,21 @@ The easiest way of doing this is to configure `git send-email`.
- Don't send HTML e-mail!
- Make sure your line-wrapping is wide enough to allow the patch to
stay un-wrapped!
## Lorri & direnv
You can enable automatic environment loading when you enter the
irdest repository, by configuring [lorri] and [direnv] on your system.
[lorri]: https://github.com/nix-community/lorri
[direnv]: https://direnv.net/
```console
❤ (uwu) ~/p/code> cd irdest
direnv: loading ~/projects/code/irdest/.envrc
direnv: export +AR +AR_FOR_TARGET +AS +AS_FOR_TARGET +CC
// ... snip ...
❤ (uwu) ~/p/c/irdest> cargo build lorri-keep-env-hack-irdest
...
```

@ -1,17 +1,17 @@
# Configuration
In order for `qaul-hubd` to work properly it will have to run in the
In order for `irdest-hubd` to work properly it will have to run in the
background to handle incoming and outgoing network connections. It's
recommended to launch it via a user systemd unit.
```systemd
[Unit]
Description=qaul hub daemon
Description=irdest hub daemon
After=network.target
[Service]
Type=simple
ExecStart=$HOME/bin/qaul-hubd <your parameters here>
ExecStart=$HOME/bin/irdest-hubd <your parameters here>
```
Save this file in `~/.local/share/systemd/user/`
@ -20,23 +20,23 @@ Now you can reload the daemon and start the unit.
```console
$ systemctl daemon-reload --user
$ systemctl enable --user qaul-hubd.service
$ systemctl start --user qaul-hubd.service
$ systemctl enable --user irdest-hubd.service
$ systemctl start --user irdest-hubd.service
```
## Available configuration
Following is a list of qaul-hubd configuration values. Those marked
Following is a list of irdest-hubd configuration values. Those marked
with a `*` are mandatory. Commandline arguments take precedence over
environment variables.
| ENV variable | Runtime argument | Description |
|----------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `*` HUBD_PEERS=[PATH] | -P / --peers [PATH] | Specify the path to a peer file, containing a newline-separated list of peers to connect to |
| `*` HUBD_PORT=[PORT] | -p / --port [PORT] | Specify a tcp port to which qaul-hubd should bind itself to listen for incoming network traffic |
| HUBD_UDP_DISCOVERY=0 | --no-udp-discover | Prevent qaul-hubd from registering a multicast address to find other clients on the same network. Some networks may forbid this, or cause performance issues. |
| HUBD_SETUP_UPNP=0 | --no-upnp | Disable automatic UPNP port forwarding. Some networks may forbid this, or cause performance issues. |
| HUBD_RUN_MODE=[MODE] | -m / --mode [MODE] | Specify the peering mode of this hub. Possible values: "static", "dynamic" |
| HUBD_ADDR=[ADDR] | -a / --addr [ADDR] | A valid address to bind to. Must be a valid ip address format. |
| ENV variable | Runtime argument | Description |
|-----------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `*` HUBD_PEERS=[PATH] | -P / --peers [PATH] | Specify the path to a peer file, containing a newline-separated list of peers to connect to |
| `*` HUBD_PORT=[PORT] | -p / --port [PORT] | Specify a tcp port to which irdest-hubd should bind itself to listen for incoming network traffic |
| HUBD_UDP_DISCOVERY=0 | --no-udp-discover | Prevent irdest-hubd from registering a multicast address to find other clients on the same network. Some networks may forbid this, or cause performance issues. |
| HUBD_SETUP_UPNP=0 | --no-upnp | Disable automatic UPNP port forwarding. Some networks may forbid this, or cause performance issues. |
| HUBD_RUN_MODE=[MODE] | -m / --mode [MODE] | Specify the peering mode of this hub. Possible values: "static", "dynamic" |
| HUBD_ADDR=[ADDR] | -a / --addr [ADDR] | A valid address to bind to. Must be a valid ip address format. |

@ -19,21 +19,3 @@ Afterwards you can simple run `cargo build --bin irdest-hubd
--release` to build a new hubd binary.
The output artifact will be written to `./target/release/irdest-hubd`.
## Lorri & direnv
You can enable automatic environment loading when you enter the
irdest repository, by configuring [lorri] and [direnv] on your system.
[lorri]: https://github.com/target/lorri
[direnv]: https://direnv.net/
```console
❤ (uwu) ~/p/code> cd irdest
direnv: loading ~/projects/code/irdest/.envrc
direnv: export +AR +AR_FOR_TARGET +AS +AS_FOR_TARGET +CC
// ... snip ...
❤ (uwu) ~/p/c/irdest> cargo build lorri-keep-env-hack-irdest
...
```

@ -82,11 +82,11 @@ impl RpcServer {
self.socket
.listen(move |msg| {
let enc = this.serv.encoding();
debug!("Received an RPC request: {:?}!", msg);
let req = io::decode::<String>(enc, &msg.data)
.ok()
.and_then(|json| Capabilities::from_json(&json))
.unwrap();
let req = dbg!(io::decode::<Capabilities>(enc, &msg.data)).unwrap();
debug!("Request: {:?}", req);
let _this = Arc::clone(&this);
task::spawn(async move { _this.spawn_on_request(msg, req).await });
@ -95,6 +95,7 @@ impl RpcServer {
}
async fn spawn_on_request(self: &Arc<Self>, msg: Message, cap: Capabilities) {
debug!("Executing capability: {:?}", cap);
use Capabilities::*;
use MessageCapabilities as MsgCap;
use UserCapabilities as UserCap;

@ -11,7 +11,7 @@ mod tests;
use crate::{
error::Error,
messages::{IdType, Message, Mode, MsgQuery},
messages::{IdType, Message, Mode, MsgId, MsgQuery},
services::Service,
users::{UserAuth, UserProfile, UserUpdate},
Identity,
@ -125,4 +125,5 @@ pub enum UserReply {
pub enum MessageReply {
Ok,
Message(Message),
MsgId(MsgId),
}

@ -110,7 +110,8 @@ impl DerefMut for MetadataMap {
&mut self.map
}
}
/// Represents a service using irdest
/// Represents a service using libqaul
///
/// Via this type it's possible to either perform actions as a
/// particular survice, or none, which means that all service's events

@ -25,6 +25,10 @@ pub struct Broker {
impl Broker {
pub async fn new() -> RpcResult<Arc<Self>> {
let (addr, port) = default_socket_path();
Self::bind(addr, port).await
}
pub async fn bind(addr: &str, port: u16) -> RpcResult<Arc<Self>> {
let _conns = ServiceMap::new();
// Create a new RpcSocket that listens for new connections and
@ -50,9 +54,8 @@ fn reader_loop(mut stream: TcpStream, map: Arc<ServiceMap>) {
}
// Then create a run-loop where we continuously handle incoming messages
debug!("Listening for incoming messages");
loop {
debug!("Listening for incoming messages");
// Some errors are fatal, others are not
match handle_packet(&mut stream, &map).await {
Ok(()) => {}

@ -47,6 +47,16 @@ impl ServiceMap {
.ok_or_else(|| RpcError::NoSuchService(name.clone()))
}
/// Get the active TCP stream for a service
pub(crate) async fn stream(&self, name: &String) -> RpcResult<TcpStream> {
self.inner
.read()
.await
.get(name)
.map(|serv| serv.io.clone())
.ok_or_else(|| RpcError::NoSuchService(name.clone()))
}
/// Register a new service
pub(crate) async fn register(
&self,

@ -76,12 +76,23 @@ pub(crate) async fn handle_sdk_command(
/// Handle messages address to another service
pub(crate) async fn proxy_message(
stream: &mut TcpStream,
_return: &mut TcpStream,
map: &Arc<ServiceMap>,
msg: Message,
) -> RpcResult<()> {
debug!("Proxying message from '{}' to '{}'", msg.from, msg.to);
// Fetch the target service capabilities
let caps = map.caps(&msg.to).await?;
match map.stream(&msg.to).await {
Ok(ref mut stream) => io::send(stream, caps.encoding, &msg).await,
Err(_) => {
warn!(
"Swallowing message addressed to unknown service: '{}'",
msg.to
);
Ok(())
Ok(())
}
}
}

@ -5,7 +5,7 @@ use std::fmt::{self, Display, Formatter};
pub type RpcResult<T> = Result<T, RpcError>;
/// A set of errors that occur when connecting to services
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum RpcError {
/// No such service was found by the broker
NoSuchService(String),

@ -15,6 +15,19 @@ pub struct Message {
pub data: Vec<u8>,
}
impl std::fmt::Debug for Message {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"Message {{ id: {}, to: {}, from: {}, data: {} }}",
self.id,
self.to,
self.from,
std::str::from_utf8(&self.data).unwrap_or("<unprintable>")
)
}
}
impl Message {
/// Create a new message to an address
pub fn to_addr(to: &str, from: &str, data: Vec<u8>) -> Self {

@ -28,7 +28,7 @@ pub(crate) type ArcBool = Arc<AtomicBool>;
///
/// Following is an overview of the subscription message flow.
///
/// ```no_run
/// ```text
/// [ Your Service ] [ Remote Service ]
/// SubscriptionCmd::Register ----------->
/// <------------- SdkReply::Identity
@ -117,7 +117,7 @@ where
pub struct SubSwitch {
enc: u8,
// Maybe lock this
map: BTreeMap<Identity, Sender<Vec<u8>>>,
map: RwLock<BTreeMap<Identity, Sender<Vec<u8>>>>,
}
impl SubSwitch {
@ -130,19 +130,27 @@ impl SubSwitch {
}
/// Create new subscription on the switch
pub fn create<T>(&mut self, encoding: u8) -> Subscription<T>
pub async fn create<T>(&self, encoding: u8) -> Subscription<T>
where
T: DeserializeOwned,
{
let (tx, rx) = bounded(8);
let id = Identity::random();
self.map.insert(id.clone(), tx);
self.map.write().await.insert(id.clone(), tx);
Subscription::new(rx, encoding, id)
}
/// Send message push data to subscription handler
///
/// When calling `forward` you may want to peel the concrete
/// message type of your subscription object from the carrier that
/// your service is notified with (depending on your service
/// protocol). This ensures that the subscription is typed
/// correctly and can read the incoming stream. A second
/// serialisation is done in this function.
pub async fn forward<T: Serialize>(&self, id: Identity, vec: T) -> RpcResult<()> {
let sender = self.map.get(&id).ok_or(RpcError::NoSuchSubscription)?;
let map = self.map.read().await;
let sender = map.get(&id).ok_or(RpcError::NoSuchSubscription)?;
sender.send(io::encode(self.enc, &vec)?).await.unwrap();
Ok(())
}

@ -1,6 +1,6 @@
[package]
name = "irdest-sdk"
description = "Client SDK for libqaul"
description = "Client SDK for irdest"
version = "0.1.0"
authors = ["Katharina Fey <kookie@spacekookie.de>"]
edition = "2018"
@ -9,3 +9,12 @@ license = "AGPL-3.0-or-later"
[dependencies]
ircore-types = { version = "0.1", path = "../../irdest-core/types" }
irpc-sdk = { version = "0.1", path = "../../rpc-core/irpc-sdk" }
alexandria-tags = { version = "0.2", path = "../../utils/alexandria-tags" }
tracing = "0.1"
[dev-dependencies]
irdest-core = { path = "../../irdest-core" }
irpc-broker = { path = "../../rpc-core/irpc-broker" }
ratman-harness = { path = "../../ratman/harness" }
async-std = { version = "1.0", features = ["attributes"] }
tracing-subscriber = { version = "0.2", features = ["fmt"] }

@ -1,61 +1,74 @@
//! qaul development SDK.
//! Irdest development SDK.
//!
//! This SDK provides you with asynchronous access to all of the core
//! irdest function interfaces, while being connected to a remote
//! server instance via the irdest-rpc (irpc) system.
//!
//! To interact with the irpc system you need to create a
//! [`Service`](irpc_sdk::Service), which is registered with the
//! remote RPC broker.
//!
//! The API surface is exposed via the `QaulRpc` type, while data
//! types are exposed via the `libqaul-types` crate (re-exported from
//! this crate via [`types`]).
//!
//! Check the qrpc-sdk documentation to learn how to use this crate.
//! ```rust
//! use irpc_sdk::Service;
//! use irdest_sdk::IrdestSdk;
//! ```
pub use ircore_types::*;
pub use irpc_sdk::{
default_socket_path,
error::{RpcError, RpcResult},
io::Message,
io::{self, Message},
RpcSocket, Service,
};
pub use std::{str, sync::Arc};
use rpc::{Capabilities, Reply, UserCapabilities, UserReply, ADDRESS};
use alexandria_tags::TagSet;
use messages::{IdType, Mode, MsgId};
use rpc::{Capabilities, MessageReply, Reply, UserCapabilities, UserReply, ADDRESS};
use services::Service as ServiceId;
use users::UserAuth;
/// A qrpc wrapper for libqaul
/// A irpc wrapper for irdest-core
///
/// This component exposes a public API surface to mirror the libqaul
/// crate. This means that other clients on the qrpc bus can include
/// this surface to get access to all libqaul functions, thate are
/// transparently mapped to the underlying libqaul instance
/// This component exposes a public API surface to mirror the irdest-core
/// crate. This means that other clients on the irpc bus can include
/// this surface to get access to all irdest-core functions, thate are
/// transparently mapped to the underlying irdest-core instance
/// potentially running in a different process.
pub struct QaulRpc {
pub struct IrdestSdk {
socket: Arc<RpcSocket>,
addr: String,
enc: u8,
}
impl QaulRpc {
impl IrdestSdk {
pub fn connect(service: &Service) -> RpcResult<Self> {
let socket = service.socket();
let addr = service.name.clone();
Ok(Self { socket, addr })
let enc = service.encoding();
Ok(Self { socket, addr, enc })
}
pub fn users<'q>(&'q self) -> UserRpc<'q> {
pub fn users<'ir>(&'ir self) -> UserRpc<'ir> {
UserRpc { rpc: self }
}
pub fn messages<'ir>(&'ir self) -> MessageRpc<'ir> {
MessageRpc { rpc: self }
}
async fn send(&self, cap: Capabilities) -> RpcResult<Reply> {
let json = cap.to_json();
let msg = Message::to_addr(ADDRESS, &self.addr, json.as_bytes().to_vec());
self.socket
.send(msg, |Message { data, .. }| {
match str::from_utf8(data.as_slice())
.ok()
.and_then(|json| Reply::from_json(json))
{
match io::decode::<Reply>(self.enc, &data).ok() {
// Map the Reply::Error field to a Rust error
Some(Reply::Error(e)) => Err(RpcError::Other(e.to_string())),
None => Err(RpcError::EncoderFault("Invalid json payload!".into())),
None => Err(RpcError::EncoderFault(
"Received invalid json payload!".into(),
)),
Some(r) => Ok(r),
}
})
@ -63,12 +76,12 @@ impl QaulRpc {
}
}
pub struct UserRpc<'q> {
rpc: &'q QaulRpc,
pub struct UserRpc<'ir> {
rpc: &'ir IrdestSdk,
}
impl<'q> UserRpc<'q> {
pub async fn create<S: Into<String>>(&'q self, pw: S) -> RpcResult<UserAuth> {
impl<'ir> UserRpc<'ir> {
pub async fn create<S: Into<String>>(&'ir self, pw: S) -> RpcResult<UserAuth> {
if let Reply::Users(UserReply::Auth(auth)) = self
.rpc
.send(Capabilities::Users(UserCapabilities::Create {
@ -81,4 +94,55 @@ impl<'q> UserRpc<'q> {
Err(RpcError::EncoderFault("Invalid reply payload!".into()))
}
}
pub async fn is_authenticated(&'ir self, auth: UserAuth) -> RpcResult<()> {
match self
.rpc
.send(Capabilities::Users(UserCapabilities::IsAuthenticated {
auth,
}))
.await
{
Ok(Reply::Users(UserReply::Ok)) => Ok(()),
Err(e) => Err(e),
_ => Err(RpcError::EncoderFault("Invalid reply payload!".into())),
}
}
}
pub struct MessageRpc<'ir> {
rpc: &'ir IrdestSdk,
}
impl<'ir> MessageRpc<'ir> {
pub async fn send<S, T>(
&'ir self,
auth: UserAuth,
mode: Mode,
id_type: IdType,
service: S,
tags: T,
payload: Vec<u8>,
) -> RpcResult<MsgId>
where
S: Into<ServiceId>,
T: Into<TagSet>,
{
match self
.rpc
.send(Capabilities::Messages(rpc::MessageCapabilities::Send {
auth,
mode,
id_type,
service: service.into(),
tags: tags.into(),
payload,
}))
.await
{
Ok(Reply::Message(MessageReply::MsgId(id))) => Ok(id),
Err(e) => Err(e),
_ => Err(RpcError::EncoderFault("Invalid reply payload!".into())),
}
}
}

@ -0,0 +1,29 @@
mod harness;
use harness::RpcState;
use irdest_sdk::IrdestSdk;
use irpc_sdk::error::RpcResult;
/// A simple test that connects to an Irdest instance over RPC
#[async_std::test]
async fn user_create() -> RpcResult<()> {
// Create a small test network with 2 RPC sockets
let state = RpcState::new(6060, 7070).await;
// Register a service on one of them
let serv = harness::make_service(6060).await?;
// Initialise Irdest SDK
let sdk = IrdestSdk::connect(&serv)?;
// Create a user
let auth = sdk
.users()
.create("dont write your passwords in unit tests duh")
.await?;
println!("User auth: {:?}", auth);
let is_auth = dbg!(sdk.users().is_authenticated(auth.clone()).await);
assert_eq!(is_auth, Ok(()));
Ok(())
}

@ -0,0 +1,90 @@
use irdest_core::{rpc::RpcServer, Irdest, IrdestRef};
use irpc_broker::Broker;
use irpc_sdk::{error::RpcResult, Capabilities, RpcSocket, Service};
use ratman_harness::{Initialize, ThreePoint};
use std::{sync::Arc, time::Duration};
use tracing::info;
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
pub use async_std::future::timeout;
pub use ratman_harness::{millis, sec10, sec5};
fn parse_log_level() {
let filter = EnvFilter::try_from_env("QAUL_LOG")
.unwrap_or_default()
.add_directive(LevelFilter::DEBUG.into())
.add_directive("async_std=error".parse().unwrap())
.add_directive("async_io=error".parse().unwrap())
.add_directive("polling=error".parse().unwrap())
.add_directive("mio=error".parse().unwrap());
// Initialise the logger
fmt().with_env_filter(filter).init();
info!("Initialised logger: welcome to qaul-hubd!");
}
pub struct TestServer {
inner: Arc<RpcServer>,
port: u16,
}
impl TestServer {
/// Create an RPC server with a random binding
pub async fn new(ir: IrdestRef, port: u16) -> Self {
Self {
port,
inner: RpcServer::new(ir, "127.0.0.1", port).await.unwrap(),
}
}
}
#[allow(unused)]
pub struct RpcState {
pub tp: ThreePoint<Arc<Irdest>>,
// Node A state
rpc_a: TestServer,
broker_a: Arc<Broker>,
// Node B state
// rpc_b: TestServer,
// broker_b: Arc<Broker>,
}
impl RpcState {
pub async fn new(a: u16, b: u16) -> Self {
// parse_log_level(); // If something doesn't work, enable this line!
let tp = init().await;
let broker_a = Broker::bind("127.0.0.1", a).await.unwrap();
let rpc_a = TestServer::new(Arc::clone(&tp.a.1.as_ref().unwrap()), a).await;
// let broker_b = Broker::bind("127.0.0.1", b).await.unwrap();
// let rpc_b = TestServer::new(Arc::clone(&tp.b.1.as_ref().unwrap()), b).await;
Self {
tp,
rpc_a,
broker_a,
// rpc_b,
// broker_b,
}
}
}
pub async fn zzz(dur: Duration) {
async_std::task::sleep(dur).await
}
pub async fn make_service(port: u16) -> RpcResult<Service> {
let socket = RpcSocket::connect("127.0.0.1", port).await?;
let mut service = Service::new("test", 1, "A test service");
service
.register(&socket, Capabilities::basic_json())
.await?;
Ok(service)
}
async fn init() -> ThreePoint<Arc<Irdest>> {
let mut tp = ThreePoint::new().await;
tp.init_with(|_, arc| Irdest::new(arc));
tp
}

@ -6,7 +6,7 @@
//! considered documentation. If you find anything that is unclear to
//! you, or could be commented better, please send us a patch (or MR).
use irdest_sdk::{users::UserAuth, QaulRpc};
use irdest_sdk::{users::UserAuth, IrdestSdk};
use irpc_sdk::{default_socket_path, Capabilities, RpcSocket, Service};
use tracing::{error, info};
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
@ -55,8 +55,8 @@ async fn main() {
info!("Received service ID '{}' from qrpc-broker", id);
let libqaul = QaulRpc::connect(&serv).unwrap();
let u: UserAuth = libqaul
let core = IrdestSdk::connect(&serv).unwrap();
let u: UserAuth = core
.users()
.create("foo bar baz my password is bad")
.await

Loading…
Cancel
Save