irdest-sdk: fixing subscription system

develop
Katharina Fey 3 years ago
parent 7c31efc29a
commit 36e3361c40
Signed by: kookie
GPG Key ID: 90734A9E619C8A6C
  1. 24
      irdest-core/src/rpc.rs
  2. 8
      irdest-core/types/src/rpc/mod.rs
  3. 8
      rpc-core/irpc-sdk/src/io.rs
  4. 4
      rpc-core/irpc-sdk/src/subs.rs
  5. 25
      sdk/irdest-sdk/src/lib.rs
  6. 6
      sdk/irdest-sdk/tests/harness.rs

@ -12,8 +12,8 @@ use crate::{
error::Error,
helpers::TagSet,
types::rpc::{
Capabilities, MessageCapabilities, MessageReply, Reply, UserCapabilities, UserReply,
ADDRESS,
Capabilities, MessageCapabilities, MessageReply, Reply, SubscriptionReply,
UserCapabilities, UserReply, ADDRESS,
},
types::services::Service,
users::{UserAuth, UserProfile, UserUpdate},
@ -82,9 +82,9 @@ impl RpcServer {
self.socket
.listen(move |msg| {
let enc = this.serv.encoding();
debug!("Received an RPC request: {:?}!", msg);
info!("Received an RPC request from '{}'!", msg.from);
let req = dbg!(io::decode::<Capabilities>(enc, &msg.data)).unwrap();
let req = io::decode::<Capabilities>(enc, &msg.data).unwrap();
debug!("Request: {:?}", req);
@ -232,24 +232,16 @@ impl RpcServer {
break;
}
// Push message to socket
let r = Reply::Message(MessageReply::Message(new_msg));
socket
.reply(
_msg.clone().reply(
ADDRESS,
// Create a reply message
Reply::Message(MessageReply::Message(new_msg))
.to_json()
.as_bytes()
.to_vec(),
),
)
.reply(_msg.clone().reply(ADDRESS, r.to_json().as_bytes().to_vec()))
.await
.unwrap();
}
});
Reply::Subscription(msg.id)
Reply::Subscription(SubscriptionReply::Ok(msg.id))
}
Err(e) => Reply::Error(e),
}

@ -95,7 +95,7 @@ pub enum Reply {
Users(UserReply),
Message(MessageReply),
/// A special reply type that handles registering subscriptions
Subscription(Identity),
Subscription(SubscriptionReply),
/// A special reply type that wraps all error codes
Error(Error),
}
@ -127,3 +127,9 @@ pub enum MessageReply {
Message(Message),
MsgId(MsgId),
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
#[serde(tag = "type", content = "data", rename_all = "kebab-case")]
pub enum SubscriptionReply {
Ok(Identity),
}

@ -99,6 +99,12 @@ pub fn encode<S: Serialize>(enc: u8, msg: &S) -> RpcResult<Vec<u8>> {
pub fn decode<D: DeserializeOwned>(enc: u8, data: &Vec<u8>) -> RpcResult<D> {
Ok(match enc {
ENCODING_JSON => serde_json::from_str(std::str::from_utf8(data).unwrap())?,
_ => todo!(), // Old broker won't support new encoding
enc => {
error!(
"Received a message with unsupported encoding type `{}`",
enc
);
todo!()
} // Old broker won't support new encoding
})
}

@ -146,10 +146,10 @@ impl SubSwitch {
/// protocol). This ensures that the subscription is typed
/// correctly and can read the incoming stream. A second
/// serialisation is done in this function.
pub async fn forward<T: Serialize>(&self, id: Identity, vec: T) -> RpcResult<()> {
pub async fn forward(&self, id: Identity, vec: Vec<u8>) -> RpcResult<()> {
let map = self.map.read().await;
let sender = map.get(&id).ok_or(RpcError::NoSuchSubscription)?;
sender.send(io::encode(self.enc, &vec)?).await.unwrap();
sender.send(vec).await.unwrap();
Ok(())
}
}

@ -18,14 +18,16 @@ pub use irpc_sdk::{
default_socket_path,
error::{RpcError, RpcResult},
io::{self, Message},
RpcSocket, Service, SubSwitch, Subscription,
RpcSocket, Service, SubSwitch, Subscription, ENCODING_JSON,
};
pub use std::{str, sync::Arc};
use alexandria_tags::TagSet;
use async_std::task;
use messages::{IdType, Message as IrdestMessage, Mode, MsgId};
use rpc::{Capabilities, MessageReply, Reply, UserCapabilities, UserReply, ADDRESS};
use rpc::{
Capabilities, MessageReply, Reply, SubscriptionReply, UserCapabilities, UserReply, ADDRESS,
};
use services::Service as ServiceId;
use users::{UserAuth, UserProfile, UserUpdate};
@ -316,31 +318,20 @@ impl<'ir> MessageRpc<'ir> {
{
// Create a Subscription object and a task that pushes
// updates to it for incoming subscription events
Ok(Reply::Subscription(sub_id)) => {
let s = self.rpc.subs.create(0, sub_id).await;
Ok(Reply::Subscription(SubscriptionReply::Ok(sub_id))) => {
let s = self.rpc.subs.create(ENCODING_JSON, sub_id).await;
panic!("WE GOT A SUBSCRIPTION ID!!!!: {}", sub_id);
// Listen for events for this task
let rpc = Arc::clone(&self.rpc.socket.clone());
let subs = Arc::clone(&self.rpc.subs);
let enc = self.rpc.enc;
task::spawn(async move {
let subs = Arc::clone(&subs);
rpc.wait_for(sub_id, |Message { data, .. }| {
let subs = Arc::clone(&subs);
async move {
match io::decode::<Reply>(enc, &data).ok() {
Some(Reply::Message(MessageReply::Message(msg))) => {
subs.forward(sub_id, msg).await
}
_ => Err(RpcError::EncoderFault(
"Received invalid subscription payload!".into(),
)),
}
}
// Return this future to run
async move { subs.forward(sub_id, data).await }
})
.await
.unwrap();

@ -10,9 +10,11 @@ pub use async_std::future::timeout;
pub use ratman_harness::{millis, sec10, sec5};
pub fn parse_log_level() {
let filter = EnvFilter::try_from_env("QAUL_LOG")
let filter = EnvFilter::try_from_env("IRDEST_LOG")
.unwrap_or_default()
.add_directive(LevelFilter::DEBUG.into())
.add_directive("alexandria=warn".parse().unwrap())
.add_directive("ratman=warn".parse().unwrap())
.add_directive("async_std=error".parse().unwrap())
.add_directive("async_io=error".parse().unwrap())
.add_directive("polling=error".parse().unwrap())
@ -20,7 +22,7 @@ pub fn parse_log_level() {
// Initialise the logger
fmt().with_env_filter(filter).init();
info!("Initialised logger: welcome to qaul-hubd!");
info!("Initialised logger: welcome to irdest-hubd!");
}
pub struct TestServer {

Loading…
Cancel
Save