diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2023-10-01 00:00:00 +0100 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2023-10-01 00:00:00 +0100 |
| commit | e237328ec611a5891586530c1d3cb26c16c1093b (patch) | |
| tree | 22ac36baa240354d06ae82eb070609fa3e3fcb82 /test_utils/src/relay.rs | |
| parent | 000901c0cbca8464b5a89bcc93c5474f6564bafd (diff) | |
feat(login) fetch user relays and metadata
get user relay list and metadata events from relays when keys are
used and last fetch attempt was more than an hour ago
uses user's write relays if known, otherwise uses fallback relays
to achieve this a method for intergration testing event fetching
from relays was added
Diffstat (limited to 'test_utils/src/relay.rs')
| -rw-r--r-- | test_utils/src/relay.rs | 126 |
1 files changed, 117 insertions, 9 deletions
diff --git a/test_utils/src/relay.rs b/test_utils/src/relay.rs index 6de3618..ce618a3 100644 --- a/test_utils/src/relay.rs +++ b/test_utils/src/relay.rs | |||
| @@ -5,26 +5,36 @@ use nostr::{ClientMessage, RelayMessage}; | |||
| 5 | 5 | ||
| 6 | use crate::CliTester; | 6 | use crate::CliTester; |
| 7 | 7 | ||
| 8 | type ListenerFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>; | 8 | type ListenerEventFunc<'a> = &'a dyn Fn(&mut Relay, u64, nostr::Event) -> Result<()>; |
| 9 | pub type ListenerReqFunc<'a> = | ||
| 10 | &'a dyn Fn(&mut Relay, u64, nostr::SubscriptionId, Vec<nostr::Filter>) -> Result<()>; | ||
| 9 | 11 | ||
| 10 | pub struct Relay<'a> { | 12 | pub struct Relay<'a> { |
| 11 | port: u16, | 13 | port: u16, |
| 12 | event_hub: simple_websockets::EventHub, | 14 | event_hub: simple_websockets::EventHub, |
| 13 | clients: HashMap<u64, simple_websockets::Responder>, | 15 | clients: HashMap<u64, simple_websockets::Responder>, |
| 14 | pub events: Vec<nostr::Event>, | 16 | pub events: Vec<nostr::Event>, |
| 15 | event_listener: Option<ListenerFunc<'a>>, | 17 | pub reqs: Vec<Vec<nostr::Filter>>, |
| 18 | event_listener: Option<ListenerEventFunc<'a>>, | ||
| 19 | req_listener: Option<ListenerReqFunc<'a>>, | ||
| 16 | } | 20 | } |
| 17 | 21 | ||
| 18 | impl<'a> Relay<'a> { | 22 | impl<'a> Relay<'a> { |
| 19 | pub fn new(port: u16, event_listener: Option<ListenerFunc<'a>>) -> Self { | 23 | pub fn new( |
| 24 | port: u16, | ||
| 25 | event_listener: Option<ListenerEventFunc<'a>>, | ||
| 26 | req_listener: Option<ListenerReqFunc<'a>>, | ||
| 27 | ) -> Self { | ||
| 20 | let event_hub = simple_websockets::launch(port) | 28 | let event_hub = simple_websockets::launch(port) |
| 21 | .unwrap_or_else(|_| panic!("failed to listen on port {port}")); | 29 | .unwrap_or_else(|_| panic!("failed to listen on port {port}")); |
| 22 | Self { | 30 | Self { |
| 23 | port, | 31 | port, |
| 24 | events: vec![], | 32 | events: vec![], |
| 33 | reqs: vec![], | ||
| 25 | event_hub, | 34 | event_hub, |
| 26 | clients: HashMap::new(), | 35 | clients: HashMap::new(), |
| 27 | event_listener, | 36 | event_listener, |
| 37 | req_listener, | ||
| 28 | } | 38 | } |
| 29 | } | 39 | } |
| 30 | pub fn respond_ok( | 40 | pub fn respond_ok( |
| @@ -44,11 +54,54 @@ impl<'a> Relay<'a> { | |||
| 44 | // bail!(format!("{}", &ok_json)); | 54 | // bail!(format!("{}", &ok_json)); |
| 45 | Ok(responder.send(simple_websockets::Message::Text(ok_json))) | 55 | Ok(responder.send(simple_websockets::Message::Text(ok_json))) |
| 46 | } | 56 | } |
| 57 | |||
| 58 | pub fn respond_eose( | ||
| 59 | &self, | ||
| 60 | client_id: u64, | ||
| 61 | subscription_id: nostr::SubscriptionId, | ||
| 62 | ) -> Result<bool> { | ||
| 63 | let responder = self.clients.get(&client_id).unwrap(); | ||
| 64 | |||
| 65 | Ok(responder.send(simple_websockets::Message::Text( | ||
| 66 | RelayMessage::EndOfStoredEvents(subscription_id).as_json(), | ||
| 67 | ))) | ||
| 68 | } | ||
| 69 | |||
| 70 | /// send events and eose | ||
| 71 | pub fn respond_events( | ||
| 72 | &self, | ||
| 73 | client_id: u64, | ||
| 74 | subscription_id: &nostr::SubscriptionId, | ||
| 75 | events: &Vec<nostr::Event>, | ||
| 76 | ) -> Result<bool> { | ||
| 77 | let responder = self.clients.get(&client_id).unwrap(); | ||
| 78 | |||
| 79 | for event in events { | ||
| 80 | let res = responder.send(simple_websockets::Message::Text( | ||
| 81 | RelayMessage::Event { | ||
| 82 | subscription_id: subscription_id.clone(), | ||
| 83 | event: Box::new(event.clone()), | ||
| 84 | } | ||
| 85 | .as_json(), | ||
| 86 | )); | ||
| 87 | if !res { | ||
| 88 | return Ok(false); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | self.respond_eose(client_id, subscription_id.clone()) | ||
| 92 | } | ||
| 93 | |||
| 94 | pub fn shutdown(&mut self) -> Result<()> { | ||
| 95 | let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", self.port))?; | ||
| 96 | socket.write(tungstenite::Message::text("shut me down"))?; | ||
| 97 | socket.close(None)?; | ||
| 98 | Ok(()) | ||
| 99 | } | ||
| 47 | /// listen, collect events and responds with event_listener to events or | 100 | /// listen, collect events and responds with event_listener to events or |
| 48 | /// Ok(eventid) if event_listner is None | 101 | /// Ok(eventid) if event_listner is None |
| 49 | pub async fn listen_until_close(&mut self) -> Result<()> { | 102 | pub async fn listen_until_close(&mut self) -> Result<()> { |
| 50 | loop { | 103 | loop { |
| 51 | println!("polling"); | 104 | println!("{} polling", self.port); |
| 52 | match self.event_hub.poll_async().await { | 105 | match self.event_hub.poll_async().await { |
| 53 | simple_websockets::Event::Connect(client_id, responder) => { | 106 | simple_websockets::Event::Connect(client_id, responder) => { |
| 54 | // add their Responder to our `clients` map: | 107 | // add their Responder to our `clients` map: |
| @@ -65,8 +118,13 @@ impl<'a> Relay<'a> { | |||
| 65 | "Received a message from client #{}: {:?}", | 118 | "Received a message from client #{}: {:?}", |
| 66 | client_id, message | 119 | client_id, message |
| 67 | ); | 120 | ); |
| 68 | 121 | if let simple_websockets::Message::Text(s) = message.clone() { | |
| 69 | if let Ok(event) = get_nevent(message) { | 122 | if s.eq("shut me down") { |
| 123 | println!("{} recieved shut me down", self.port); | ||
| 124 | break; | ||
| 125 | } | ||
| 126 | } | ||
| 127 | if let Ok(event) = get_nevent(&message) { | ||
| 70 | self.events.push(event.clone()); | 128 | self.events.push(event.clone()); |
| 71 | if let Some(listner) = self.event_listener { | 129 | if let Some(listner) = self.event_listener { |
| 72 | listner(self, client_id, event)?; | 130 | listner(self, client_id, event)?; |
| @@ -74,16 +132,40 @@ impl<'a> Relay<'a> { | |||
| 74 | self.respond_ok(client_id, event, None)?; | 132 | self.respond_ok(client_id, event, None)?; |
| 75 | } | 133 | } |
| 76 | } | 134 | } |
| 135 | |||
| 136 | if let Ok((subscription_id, filters)) = get_nreq(&message) { | ||
| 137 | self.reqs.push(filters.clone()); | ||
| 138 | if let Some(listner) = self.req_listener { | ||
| 139 | listner(self, client_id, subscription_id, filters)?; | ||
| 140 | } else { | ||
| 141 | self.respond_eose(client_id, subscription_id)?; | ||
| 142 | } | ||
| 143 | // respond with events | ||
| 144 | // respond with EOSE | ||
| 145 | } | ||
| 146 | if is_nclose(&message) { | ||
| 147 | println!("{} recieved nostr close", self.port); | ||
| 148 | break; | ||
| 149 | } | ||
| 77 | } | 150 | } |
| 78 | } | 151 | } |
| 79 | } | 152 | } |
| 80 | println!("stop polling"); | 153 | println!( |
| 81 | println!("we may not be polling but the tcplistner is still listening"); | 154 | "{} stop polling. we may not be polling but the tcplistner is still listening", |
| 155 | self.port | ||
| 156 | ); | ||
| 82 | Ok(()) | 157 | Ok(()) |
| 83 | } | 158 | } |
| 84 | } | 159 | } |
| 85 | 160 | ||
| 86 | fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> { | 161 | pub fn shutdown_relay(port: u64) -> Result<()> { |
| 162 | let (mut socket, _) = tungstenite::connect(format!("ws://localhost:{}", port))?; | ||
| 163 | socket.write(tungstenite::Message::text("shut me down"))?; | ||
| 164 | socket.close(None)?; | ||
| 165 | Ok(()) | ||
| 166 | } | ||
| 167 | |||
| 168 | fn get_nevent(message: &simple_websockets::Message) -> Result<nostr::Event> { | ||
| 87 | if let simple_websockets::Message::Text(s) = message.clone() { | 169 | if let simple_websockets::Message::Text(s) = message.clone() { |
| 88 | let cm_result = ClientMessage::from_json(s); | 170 | let cm_result = ClientMessage::from_json(s); |
| 89 | if let Ok(ClientMessage::Event(event)) = cm_result { | 171 | if let Ok(ClientMessage::Event(event)) = cm_result { |
| @@ -94,6 +176,32 @@ fn get_nevent(message: simple_websockets::Message) -> Result<nostr::Event> { | |||
| 94 | bail!("not nostr event") | 176 | bail!("not nostr event") |
| 95 | } | 177 | } |
| 96 | 178 | ||
| 179 | fn get_nreq( | ||
| 180 | message: &simple_websockets::Message, | ||
| 181 | ) -> Result<(nostr::SubscriptionId, Vec<nostr::Filter>)> { | ||
| 182 | if let simple_websockets::Message::Text(s) = message.clone() { | ||
| 183 | let cm_result = ClientMessage::from_json(s); | ||
| 184 | if let Ok(ClientMessage::Req { | ||
| 185 | subscription_id, | ||
| 186 | filters, | ||
| 187 | }) = cm_result | ||
| 188 | { | ||
| 189 | return Ok((subscription_id, filters)); | ||
| 190 | } | ||
| 191 | } | ||
| 192 | bail!("not nostr event") | ||
| 193 | } | ||
| 194 | |||
| 195 | fn is_nclose(message: &simple_websockets::Message) -> bool { | ||
| 196 | if let simple_websockets::Message::Text(s) = message.clone() { | ||
| 197 | let cm_result = ClientMessage::from_json(s); | ||
| 198 | if let Ok(ClientMessage::Close(_)) = cm_result { | ||
| 199 | return true; | ||
| 200 | } | ||
| 201 | } | ||
| 202 | false | ||
| 203 | } | ||
| 204 | |||
| 97 | pub enum Message { | 205 | pub enum Message { |
| 98 | Event, | 206 | Event, |
| 99 | // Request, | 207 | // Request, |