upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/test_utils/src/relay.rs
diff options
context:
space:
mode:
Diffstat (limited to 'test_utils/src/relay.rs')
-rw-r--r--test_utils/src/relay.rs126
1 files changed, 117 insertions, 9 deletions
diff --git a/test_utils/src/relay.rs b/test_utils/src/relay.rs
index 6de3618..ce618a3 100644
--- a/test_utils/src/relay.rs
+++ b/test_utils/src/relay.rs
@@ -5,26 +5,36 @@ use nostr::{ClientMessage, RelayMessage};
5 5
6use crate::CliTester; 6use crate::CliTester;
7 7
8type ListenerFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>; 8type ListenerEventFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>;
9pub type ListenerReqFunc<'a> =
10 &'a dyn Fn(&mut Relay, u64, nostr::SubscriptionId, Vec<nostr::Filter>) -> Result<()>;
9 11
10pub struct Relay<'a> { 12pub struct Relay<'a> {
11 port: u16, 13 port: u16,
12 event_hub: simple_websockets::EventHub, 14 event_hub: simple_websockets::EventHub,
13 clients: HashMap<u64, simple_websockets::Responder>, 15 clients: HashMap<u64, simple_websockets::Responder>,
14 pub events: Vec<nostr::Event>, 16 pub events: Vec<nostr::Event>,
15 event_listener: Option<ListenerFunc<'a>>, 17 pub reqs: Vec<Vec<nostr::Filter>>,
18 event_listener: Option<ListenerEventFunc<'a>>,
19 req_listener: Option<ListenerReqFunc<'a>>,
16} 20}
17 21
18impl<'a> Relay<'a> { 22impl<'a> Relay<'a> {
19 pub fn new(port: u16, event_listener: Option<ListenerFunc<'a>>) -> Self { 23 pub fn new(
24 port: u16,
25 event_listener: Option<ListenerEventFunc<'a>>,
26 req_listener: Option<ListenerReqFunc<'a>>,
27 ) -> Self {
20 let event_hub = simple_websockets::launch(port) 28 let event_hub = simple_websockets::launch(port)
21 .unwrap_or_else(|_| panic!("failed to listen on port {port}")); 29 .unwrap_or_else(|_| panic!("failed to listen on port {port}"));
22 Self { 30 Self {
23 port, 31 port,
24 events: vec![], 32 events: vec![],
33 reqs: vec![],
25 event_hub, 34 event_hub,
26 clients: HashMap::new(), 35 clients: HashMap::new(),
27 event_listener, 36 event_listener,
37 req_listener,
28 } 38 }
29 } 39 }
30 pub fn respond_ok( 40 pub fn respond_ok(
@@ -44,11 +54,54 @@ impl<'a> Relay<'a> {
44 // bail!(format!("{}", &ok_json)); 54 // bail!(format!("{}", &ok_json));
45 Ok(responder.send(simple_websockets::Message::Text(ok_json))) 55 Ok(responder.send(simple_websockets::Message::Text(ok_json)))
46 } 56 }
57
58 pub fn respond_eose(
59 &self,
60 client_id: u64,
61 subscription_id: nostr::SubscriptionId,
62 ) -> Result<bool> {
63 let responder = self.clients.get(&client_id).unwrap();
64
65 Ok(responder.send(simple_websockets::Message::Text(
66 RelayMessage::EndOfStoredEvents(subscription_id).as_json(),
67 )))
68 }
69
70 /// send events and eose
71 pub fn respond_events(
72 &self,
73 client_id: u64,
74 subscription_id: &nostr::SubscriptionId,
75 events: &Vec<nostr::Event>,
76 ) -> Result<bool> {
77 let responder = self.clients.get(&client_id).unwrap();
78
79 for event in events {
80 let res = responder.send(simple_websockets::Message::Text(
81 RelayMessage::Event {
82 subscription_id: subscription_id.clone(),
83 event: Box::new(event.clone()),
84 }
85 .as_json(),
86 ));
87 if !res {
88 return Ok(false);
89 }
90 }
91 self.respond_eose(client_id, subscription_id.clone())
92 }
93
94 pub fn shutdown(&mut self) -> Result<()> {
95 let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", self.port))?;
96 socket.write(tungstenite::Message::text("shut me down"))?;
97 socket.close(None)?;
98 Ok(())
99 }
47 /// listen, collect events and responds with event_listener to events or 100 /// listen, collect events and responds with event_listener to events or
48 /// Ok(eventid) if event_listner is None 101 /// Ok(eventid) if event_listner is None
49 pub async fn listen_until_close(&mut self) -> Result<()> { 102 pub async fn listen_until_close(&mut self) -> Result<()> {
50 loop { 103 loop {
51 println!("polling"); 104 println!("{} polling", self.port);
52 match self.event_hub.poll_async().await { 105 match self.event_hub.poll_async().await {
53 simple_websockets::Event::Connect(client_id, responder) => { 106 simple_websockets::Event::Connect(client_id, responder) => {
54 // add their Responder to our `clients` map: 107 // add their Responder to our `clients` map:
@@ -65,8 +118,13 @@ impl<'a> Relay<'a> {
65 "Received a message from client #{}: {:?}", 118 "Received a message from client #{}: {:?}",
66 client_id, message 119 client_id, message
67 ); 120 );
68 121 if let simple_websockets::Message::Text(s) = message.clone() {
69 if let Ok(event) = get_nevent(message) { 122 if s.eq("shut me down") {
123 println!("{} recieved shut me down", self.port);
124 break;
125 }
126 }
127 if let Ok(event) = get_nevent(&message) {
70 self.events.push(event.clone()); 128 self.events.push(event.clone());
71 if let Some(listner) = self.event_listener { 129 if let Some(listner) = self.event_listener {
72 listner(self, client_id, event)?; 130 listner(self, client_id, event)?;
@@ -74,16 +132,40 @@ impl<'a> Relay<'a> {
74 self.respond_ok(client_id, event, None)?; 132 self.respond_ok(client_id, event, None)?;
75 } 133 }
76 } 134 }
135
136 if let Ok((subscription_id, filters)) = get_nreq(&message) {
137 self.reqs.push(filters.clone());
138 if let Some(listner) = self.req_listener {
139 listner(self, client_id, subscription_id, filters)?;
140 } else {
141 self.respond_eose(client_id, subscription_id)?;
142 }
143 // respond with events
144 // respond with EOSE
145 }
146 if is_nclose(&message) {
147 println!("{} recieved nostr close", self.port);
148 break;
149 }
77 } 150 }
78 } 151 }
79 } 152 }
80 println!("stop polling"); 153 println!(
81 println!("we may not be polling but the tcplistner is still listening"); 154 "{} stop polling. we may not be polling but the tcplistner is still listening",
155 self.port
156 );
82 Ok(()) 157 Ok(())
83 } 158 }
84} 159}
85 160
86fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> { 161pub fn shutdown_relay(port: u64) -> Result<()> {
162 let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", port))?;
163 socket.write(tungstenite::Message::text("shut me down"))?;
164 socket.close(None)?;
165 Ok(())
166}
167
168fn get_nevent(message: &simple_websockets::Message) -> Result<nostr::Event> {
87 if let simple_websockets::Message::Text(s) = message.clone() { 169 if let simple_websockets::Message::Text(s) = message.clone() {
88 let cm_result = ClientMessage::from_json(s); 170 let cm_result = ClientMessage::from_json(s);
89 if let Ok(ClientMessage::Event(event)) = cm_result { 171 if let Ok(ClientMessage::Event(event)) = cm_result {
@@ -94,6 +176,32 @@ fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> {
94 bail!("not nostr event") 176 bail!("not nostr event")
95} 177}
96 178
179fn get_nreq(
180 message: &simple_websockets::Message,
181) -> Result<(nostr::SubscriptionId, Vec<nostr::Filter>)> {
182 if let simple_websockets::Message::Text(s) = message.clone() {
183 let cm_result = ClientMessage::from_json(s);
184 if let Ok(ClientMessage::Req {
185 subscription_id,
186 filters,
187 }) = cm_result
188 {
189 return Ok((subscription_id, filters));
190 }
191 }
192 bail!("not nostr event")
193}
194
195fn is_nclose(message: &simple_websockets::Message) -> bool {
196 if let simple_websockets::Message::Text(s) = message.clone() {
197 let cm_result = ClientMessage::from_json(s);
198 if let Ok(ClientMessage::Close(_)) = cm_result {
199 return true;
200 }
201 }
202 false
203}
204
97pub enum Message { 205pub enum Message {
98 Event, 206 Event,
99 // Request, 207 // Request,