nixos/nats: fix test

main
Jos van Bakel 2 years ago
parent 652aa6df2c
commit daab8fb3af
No known key found for this signature in database
GPG Key ID: 37589FBAE4DA2BC3
  1. 24
      nixos/tests/nats.nix

@ -45,21 +45,19 @@ in import ./make-test-python.nix ({ pkgs, lib, ... }: {
"{}"
).format(" ".join(args))
def parallel(*fns):
from threading import Thread
threads = [ Thread(target=fn) for fn in fns ]
for t in threads: t.start()
for t in threads: t.join()
start_all()
server.wait_for_unit("nats.service")
client1.fail("test -f ${file}")
# Subscribe on topic on client1 and echo messages to file.
client1.execute("({} | tee ${file} &)".format(nats_cmd("sub", "--raw", "${topic}")))
# Give client1 some time to subscribe.
client1.execute("sleep 2")
# Publish message on client2.
client2.execute(nats_cmd("pub", "${topic}", "hello"))
# Check if message has been received.
client1.succeed("grep -q hello ${file}")
with subtest("pub sub"):
parallel(
lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")),
lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))),
)
'';
})

Loading…
Cancel
Save