diff options
Diffstat (limited to 'test_utils/src/relay.rs')
| -rw-r--r-- | test_utils/src/relay.rs | 126 |
1 files changed, 117 insertions, 9 deletions
diff --git a/test_utils/src/relay.rs b/test_utils/src/relay.rs index 6de3618..ce618a3 100644 --- a/test_utils/src/relay.rs +++ b/test_utils/src/relay.rs | |||
| @@ -5,26 +5,36 @@ use nostr::{ClientMessage, RelayMessage}; | |||
| 5 | 5 | ||
| 6 | use crate::CliTester; | 6 | use crate::CliTester; |
| 7 | 7 | ||
| 8 | type ListenerFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>; | 8 | type ListenerEventFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>; |
| 9 | pub type ListenerReqFunc<'a> = | ||
| 10 | &'a dyn Fn(&mut Relay, u64, nostr::SubscriptionId, Vec<nostr::Filter>) -> Result<()>; | ||
| 9 | 11 | ||
| 10 | pub struct Relay<'a> { | 12 | pub struct Relay<'a> { |
| 11 | port: u16, | 13 | port: u16, |
| 12 | event_hub: simple_websockets::EventHub, | 14 | event_hub: simple_websockets::EventHub, |
| 13 | clients: HashMap<u64, simple_websockets::Responder>, | 15 | clients: HashMap<u64, simple_websockets::Responder>, |
| 14 | pub events: Vec<nostr::Event>, | 16 | pub events: Vec<nostr::Event>, |
| 15 | event_listener: Option<ListenerFunc<'a>>, | 17 | pub reqs: Vec<Vec<nostr::Filter>>, |
| 18 | event_listener: Option<ListenerEventFunc<'a>>, | ||
| 19 | req_listener: Option<ListenerReqFunc<'a>>, | ||
| 16 | } | 20 | } |
| 17 | 21 | ||
| 18 | impl<'a> Relay<'a> { | 22 | impl<'a> Relay<'a> { |
| 19 | pub fn new(port: u16, event_listener: Option<ListenerFunc<'a>>) -> Self { | 23 | pub fn new( |
| 24 | port: u16, | ||
| 25 | event_listener: Option<ListenerEventFunc<'a>>, | ||
| 26 | req_listener: Option<ListenerReqFunc<'a>>, | ||
| 27 | ) -> Self { | ||
| 20 | let event_hub = simple_websockets::launch(port) | 28 | let event_hub = simple_websockets::launch(port) |
| 21 | .unwrap_or_else(|_| panic!("failed to listen on port {port}")); | 29 | .unwrap_or_else(|_| panic!("failed to listen on port {port}")); |
| 22 | Self { | 30 | Self { |
| 23 | port, | 31 | port, |
| 24 | events: vec![], | 32 | events: vec![], |
| 33 | reqs: vec![], | ||
| 25 | event_hub, | 34 | event_hub, |
| 26 | clients: HashMap::new(), | 35 | clients: HashMap::new(), |
| 27 | event_listener, | 36 | event_listener, |
| 37 | req_listener, | ||
| 28 | } | 38 | } |
| 29 | } | 39 | } |
| 30 | pub fn respond_ok( | 40 | pub fn respond_ok( |
| @@ -44,11 +54,54 @@ impl<'a> Relay<'a> { | |||
| 44 | // bail!(format!("{}", &ok_json)); | 54 | // bail!(format!("{}", &ok_json)); |
| 45 | Ok(responder.send(simple_websockets::Message::Text(ok_json))) | 55 | Ok(responder.send(simple_websockets::Message::Text(ok_json))) |
| 46 | } | 56 | } |
| 57 | |||
| 58 | pub fn respond_eose( | ||
| 59 | &self, | ||
| 60 | client_id: u64, | ||
| 61 | subscription_id: nostr::SubscriptionId, | ||
| 62 | ) -> Result<bool> { | ||
| 63 | let responder = self.clients.get(&client_id).unwrap(); | ||
| 64 | |||
| 65 | Ok(responder.send(simple_websockets::Message::Text( | ||
| 66 | RelayMessage::EndOfStoredEvents(subscription_id).as_json(), | ||
| 67 | ))) | ||
| 68 | } | ||
| 69 | |||
| 70 | /// send events and eose | ||
| 71 | pub fn respond_events( | ||
| 72 | &self, | ||
| 73 | client_id: u64, | ||
| 74 | subscription_id: &nostr::SubscriptionId, | ||
| 75 | events: &Vec<nostr::Event>, | ||
| 76 | ) -> Result<bool> { | ||
| 77 | let responder = self.clients.get(&client_id).unwrap(); | ||
| 78 | |||
| 79 | for event in events { | ||
| 80 | let res = responder.send(simple_websockets::Message::Text( | ||
| 81 | RelayMessage::Event { | ||
| 82 | subscription_id: subscription_id.clone(), | ||
| 83 | event: Box::new(event.clone()), | ||
| 84 | } | ||
| 85 | .as_json(), | ||
| 86 | )); | ||
| 87 | if !res { | ||
| 88 | return Ok(false); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | self.respond_eose(client_id, subscription_id.clone()) | ||
| 92 | } | ||
| 93 | |||
| 94 | pub fn shutdown(&mut self) -> Result<()> { | ||
| 95 | let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", self.port))?; | ||
| 96 | socket.write(tungstenite::Message::text("shut me down"))?; | ||
| 97 | socket.close(None)?; | ||
| 98 | Ok(()) | ||
| 99 | } | ||
| 47 | /// listen, collect events and responds with event_listener to events or | 100 | /// listen, collect events and responds with event_listener to events or |
| 48 | /// Ok(eventid) if event_listner is None | 101 | /// Ok(eventid) if event_listner is None |
| 49 | pub async fn listen_until_close(&mut self) -> Result<()> { | 102 | pub async fn listen_until_close(&mut self) -> Result<()> { |
| 50 | loop { | 103 | loop { |
| 51 | println!("polling"); | 104 | println!("{} polling", self.port); |
| 52 | match self.event_hub.poll_async().await { | 105 | match self.event_hub.poll_async().await { |
| 53 | simple_websockets::Event::Connect(client_id, responder) => { | 106 | simple_websockets::Event::Connect(client_id, responder) => { |
| 54 | // add their Responder to our `clients` map: | 107 | // add their Responder to our `clients` map: |
| @@ -65,8 +118,13 @@ impl<'a> Relay<'a> { | |||
| 65 | "Received a message from client #{}: {:?}", | 118 | "Received a message from client #{}: {:?}", |
| 66 | client_id, message | 119 | client_id, message |
| 67 | ); | 120 | ); |
| 68 | 121 | if let simple_websockets::Message::Text(s) = message.clone() { | |
| 69 | if let Ok(event) = get_nevent(message) { | 122 | if s.eq("shut me down") { |
| 123 | println!("{} recieved shut me down", self.port); | ||
| 124 | break; | ||
| 125 | } | ||
| 126 | } | ||
| 127 | if let Ok(event) = get_nevent(&message) { | ||
| 70 | self.events.push(event.clone()); | 128 | self.events.push(event.clone()); |
| 71 | if let Some(listner) = self.event_listener { | 129 | if let Some(listner) = self.event_listener { |
| 72 | listner(self, client_id, event)?; | 130 | listner(self, client_id, event)?; |
| @@ -74,16 +132,40 @@ impl<'a> Relay<'a> { | |||
| 74 | self.respond_ok(client_id, event, None)?; | 132 | self.respond_ok(client_id, event, None)?; |
| 75 | } | 133 | } |
| 76 | } | 134 | } |
| 135 | |||
| 136 | if let Ok((subscription_id, filters)) = get_nreq(&message) { | ||
| 137 | self.reqs.push(filters.clone()); | ||
| 138 | if let Some(listner) = self.req_listener { | ||
| 139 | listner(self, client_id, subscription_id, filters)?; | ||
| 140 | } else { | ||
| 141 | self.respond_eose(client_id, subscription_id)?; | ||
| 142 | } | ||
| 143 | // respond with events | ||
| 144 | // respond with EOSE | ||
| 145 | } | ||
| 146 | if is_nclose(&message) { | ||
| 147 | println!("{} recieved nostr close", self.port); | ||
| 148 | break; | ||
| 149 | } | ||
| 77 | } | 150 | } |
| 78 | } | 151 | } |
| 79 | } | 152 | } |
| 80 | println!("stop polling"); | 153 | println!( |
| 81 | println!("we may not be polling but the tcplistner is still listening"); | 154 | "{} stop polling. we may not be polling but the tcplistner is still listening", |
| 155 | self.port | ||
| 156 | ); | ||
| 82 | Ok(()) | 157 | Ok(()) |
| 83 | } | 158 | } |
| 84 | } | 159 | } |
| 85 | 160 | ||
| 86 | fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> { | 161 | pub fn shutdown_relay(port: u64) -> Result<()> { |
| 162 | let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", port))?; | ||
| 163 | socket.write(tungstenite::Message::text("shut me down"))?; | ||
| 164 | socket.close(None)?; | ||
| 165 | Ok(()) | ||
| 166 | } | ||
| 167 | |||
| 168 | fn get_nevent(message: &simple_websockets::Message) -> Result<nostr::Event> { | ||
| 87 | if let simple_websockets::Message::Text(s) = message.clone() { | 169 | if let simple_websockets::Message::Text(s) = message.clone() { |
| 88 | let cm_result = ClientMessage::from_json(s); | 170 | let cm_result = ClientMessage::from_json(s); |
| 89 | if let Ok(ClientMessage::Event(event)) = cm_result { | 171 | if let Ok(ClientMessage::Event(event)) = cm_result { |
| @@ -94,6 +176,32 @@ fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> { | |||
| 94 | bail!("not nostr event") | 176 | bail!("not nostr event") |
| 95 | } | 177 | } |
| 96 | 178 | ||
| 179 | fn get_nreq( | ||
| 180 | message: &simple_websockets::Message, | ||
| 181 | ) -> Result<(nostr::SubscriptionId, Vec<nostr::Filter>)> { | ||
| 182 | if let simple_websockets::Message::Text(s) = message.clone() { | ||
| 183 | let cm_result = ClientMessage::from_json(s); | ||
| 184 | if let Ok(ClientMessage::Req { | ||
| 185 | subscription_id, | ||
| 186 | filters, | ||
| 187 | }) = cm_result | ||
| 188 | { | ||
| 189 | return Ok((subscription_id, filters)); | ||
| 190 | } | ||
| 191 | } | ||
| 192 | bail!("not nostr event") | ||
| 193 | } | ||
| 194 | |||
| 195 | fn is_nclose(message: &simple_websockets::Message) -> bool { | ||
| 196 | if let simple_websockets::Message::Text(s) = message.clone() { | ||
| 197 | let cm_result = ClientMessage::from_json(s); | ||
| 198 | if let Ok(ClientMessage::Close(_)) = cm_result { | ||
| 199 | return true; | ||
| 200 | } | ||
| 201 | } | ||
| 202 | false | ||
| 203 | } | ||
| 204 | |||
| 97 | pub enum Message { | 205 | pub enum Message { |
| 98 | Event, | 206 | Event, |
| 99 | // Request, | 207 | // Request, |