irdest-sdk: correctly fix the subscription test

develop
Katharina Fey 3 years ago
parent 8a2f4a8e74
commit 1a45349ec4
Signed by: kookie
GPG Key ID: 90734A9E619C8A6C
  1. 58
      rpc-core/irpc-sdk/src/subs.rs
  2. 17
      sdk/irdest-sdk/src/lib.rs
  3. 5
      sdk/irdest-sdk/tests/basic.rs
  4. 1
      utils/alexandria-tags/src/lib.rs

@ -1,6 +1,6 @@
use crate::{
error::{RpcError, RpcResult},
io, Identity,
io, Identity, ENCODING_JSON,
};
use async_std::{
channel::{bounded, Receiver, Sender},
@ -21,59 +21,39 @@ pub(crate) type ArcBool = Arc<AtomicBool>;
/// A generic subscription type
///
/// Use this type in your component SDK to make it possible for users
/// to get updates for a particular stream of data. Use
/// [`SubscriptionCmd`](crate::proto::SubscriptionCmd) to encode the
/// subscription creation handshake. A subscription object is then
/// generic over the type returned by the subscription stream.
/// to get updates for a particular stream of data. **There is no
/// universal subsciption command**, which means that your SDK
/// protocol needs to implement this. The following example looks at
/// the `irdest-core` interface, provided by `irdest-sdk`.
///
/// Following is an overview of the subscription message flow.
///
/// ```text
/// [ Your Service ] [ Remote Service ]
/// SubscriptionCmd::Register ----------->
/// <------------- SdkReply::Identity
/// Messages::Subscription(...) ----------->
/// <------------- Reply::Subscription(Identity)
///
/// ...
///
/// <------------- SubscriptionCmd::Push
/// <------------- SubscriptionCmd::Push
/// <------------- Reply::Messages(Message)
/// <------------- Reply::Messages(Message)
///
/// ...
///
/// SubscriptionCmd::Unregister --------->
/// <------------- SdkReply::Ok
/// Messages::StopSubscription ------------>
/// <------------- Reply::Messages(Ok)
/// ```
///
/// Because subscriptions need code running on both ends of the RPC
/// connection, there are two utility types you can use to map
/// connection, there are two utility types you SHOULD use to map
/// subscriptions to and from the RPC connection.
///
/// * [`SubSwitch`](crate::SubSwitch) - maps from RPC to
/// SDK-Subscription (this type)
/// * [`SubManager`](crate::SubManager) - maps service side
/// subscriptions to the RPC stream
/// * [`SubManager`](crate::SubManager) - run by the server, and maps
/// subscriptions to an RPC connection
/// * [`SubSwitch`](crate::SubSwitch) - run by the client, and maps
/// incoming RPC messages to the `Subscription` type to poll
///
/// ## How to create a subscription
///
/// Because all of this is still very theoretical, let's walk through
/// a complete example. It's recomended to wrap all of this code in
/// abstractions so that users of your SDK don't have to worry about
/// this, but the following example doesn't use any extra
/// abstractions.
///
/// ```rust
/// # use irpc_sdk::{*, error::RpcResult};
/// # async fn test() -> RpcResult<()> {
/// # let (addr, port) = default_socket_path();
///
/// // Create an RPC socket and register a service
/// let socket = RpcSocket::connect(addr, port).await?;
/// let mut service = Service::new("sub.test", 1, "Testing subscriptions");
/// service.register(&socket, Capabilities::basic_json()).await?;
///
/// //
/// # Ok(()) }
/// ```
/// TODO: create a simple code example that creates a subscription
pub struct Subscription<T>
where
T: DeserializeOwned,
@ -146,10 +126,10 @@ impl SubSwitch {
/// protocol). This ensures that the subscription is typed
/// correctly and can read the incoming stream. A second
/// serialisation is done in this function.
pub async fn forward(&self, id: Identity, vec: Vec<u8>) -> RpcResult<()> {
pub async fn forward<T: Serialize>(&self, id: Identity, msg: &T) -> RpcResult<()> {
let map = self.map.read().await;
let sender = map.get(&id).ok_or(RpcError::NoSuchSubscription)?;
sender.send(vec).await.unwrap();
sender.send(io::encode(ENCODING_JSON, &msg)?).await.unwrap();
Ok(())
}
}

@ -13,6 +13,9 @@
//! use irdest_sdk::IrdestSdk;
//! ```
#[macro_use]
extern crate tracing;
pub use ircore_types::*;
pub use irpc_sdk::{
default_socket_path,
@ -324,14 +327,24 @@ impl<'ir> MessageRpc<'ir> {
// Listen for events for this task
let rpc = Arc::clone(&self.rpc.socket.clone());
let subs = Arc::clone(&self.rpc.subs);
let enc = self.rpc.enc;
task::spawn(async move {
let subs = Arc::clone(&subs);
rpc.wait_for(sub_id, |Message { data, .. }| {
let subs = Arc::clone(&subs);
// Return this future to run
async move { subs.forward(sub_id, data).await }
async move {
match io::decode(enc, &data) {
Ok(Reply::Message(MessageReply::Message(ref msg))) => {
subs.forward(sub_id, msg).await
}
_ => {
warn!("Received invalid subscription payload; dropping!");
Ok(())
}
}
}
})
.await
.unwrap();

@ -79,9 +79,8 @@ async fn subscription() -> RpcResult<()> {
.unwrap();
});
while let Ok(msg) = sub.next().await {
println!("Received message: {} => {:?}", msg.id, msg.payload);
}
let msg = sub.next().await.unwrap();
assert_eq!(msg.payload, "Hello you!".as_bytes().to_vec());
Ok(())
}

@ -271,7 +271,6 @@ impl Serialize for Tag {
where
S: Serializer,
{
dbg!();
let mut state = ser.serialize_struct("Tag", 2)?;
state.serialize_field("key", &self.key)?;
state.serialize_field("val", &HumanVec(self.val.clone()))?;

Loading…
Cancel
Save