diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2024-01-26 19:17:48 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2024-01-26 19:17:48 +0000 |
| commit | e0f543e8adb144f6deff6ff7ea0c412c9fcac5b4 (patch) | |
| tree | 1d70d9414273ca29f666821d98d5dd6f5f197071 /src/client.rs | |
| parent | 15879248571e2753395ecbeb52e67bd2a9ad9db0 (diff) | |
fix: fetching messages with many unreliable relays
the tool had not been tested with large number of user relays, some of
which are misbehaving. It works well when sending events to relays but
struggles when fetching messages.
it seems to crash when accessing a large number of relays. this change
queues up relays so many are not connected to at the same time.
it also shows more verbose messages about its connection and success
with relays.
many of the tests will fail as a result of this change as I havn't
updated them to expect details of more relay interaction.
further changes are urgently needed to improve the speed of fetching
events.
- relay interaction UI should reflect the smooth approach used for
sending events
- we don't need to fetch user events from every relay
- we could show the user information that we have already collected
and allow them to interact
Diffstat (limited to 'src/client.rs')
| -rw-r--r-- | src/client.rs | 59 |
1 files changed, 36 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs index 7418519..9aa574d 100644 --- a/src/client.rs +++ b/src/client.rs | |||
| @@ -12,7 +12,7 @@ | |||
| 12 | // want to inadvertlty use other features of nightly that might be removed. | 12 | // want to inadvertlty use other features of nightly that might be removed. |
| 13 | use anyhow::{Context, Result}; | 13 | use anyhow::{Context, Result}; |
| 14 | use async_trait::async_trait; | 14 | use async_trait::async_trait; |
| 15 | use futures::future::join_all; | 15 | use futures::stream::{self, StreamExt}; |
| 16 | #[cfg(test)] | 16 | #[cfg(test)] |
| 17 | use mockall::*; | 17 | use mockall::*; |
| 18 | use nostr::Event; | 18 | use nostr::Event; |
| @@ -50,10 +50,10 @@ impl Connect for Client { | |||
| 50 | ] | 50 | ] |
| 51 | } else { | 51 | } else { |
| 52 | vec![ | 52 | vec![ |
| 53 | "wss://relayable.org".to_string(), | 53 | "wss://purplepages.es".to_string(), |
| 54 | "wss://relay.f7z.io".to_string(), | ||
| 55 | "wss://relay.damus.io".to_string(), | 54 | "wss://relay.damus.io".to_string(), |
| 56 | "wss://relay.snort.social".to_string(), | 55 | "wss://nostr-pub.wellorder.net".to_string(), |
| 56 | "wss://nos.lol".to_string(), | ||
| 57 | // "ws://localhost:8080".to_string() | 57 | // "ws://localhost:8080".to_string() |
| 58 | ] | 58 | ] |
| 59 | }; | 59 | }; |
| @@ -124,19 +124,31 @@ impl Connect for Client { | |||
| 124 | 124 | ||
| 125 | let relays_map = self.client.relays().await; | 125 | let relays_map = self.client.relays().await; |
| 126 | 126 | ||
| 127 | let relay_results = join_all( | 127 | let futures: Vec<_> = relays |
| 128 | relays | 128 | .clone() |
| 129 | .clone() | 129 | .iter() |
| 130 | .iter() | 130 | .map(|r| { |
| 131 | .map(|r| { | 131 | ( |
| 132 | ( | 132 | relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(), |
| 133 | relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(), | 133 | filters.clone(), |
| 134 | filters.clone(), | 134 | ) |
| 135 | ) | 135 | }) |
| 136 | }) | 136 | .map(|(relay, filters)| async { |
| 137 | .map(|(relay, filters)| get_events_of(relay, filters)), | 137 | if !relay.is_connected().await { |
| 138 | ) | 138 | relay.connect(false).await; |
| 139 | .await; | 139 | } |
| 140 | |||
| 141 | match get_events_of(relay, filters).await { | ||
| 142 | Err(error) => { | ||
| 143 | println!("{} {}", error, relay.url()); | ||
| 144 | Err(error) | ||
| 145 | } | ||
| 146 | res => res, | ||
| 147 | } | ||
| 148 | }) | ||
| 149 | .collect(); | ||
| 150 | |||
| 151 | let relay_results = stream::iter(futures).buffer_unordered(5).collect().await; | ||
| 140 | 152 | ||
| 141 | Ok(get_dedup_events(relay_results)) | 153 | Ok(get_dedup_events(relay_results)) |
| 142 | } | 154 | } |
| @@ -146,18 +158,19 @@ async fn get_events_of( | |||
| 146 | relay: &nostr_sdk::Relay, | 158 | relay: &nostr_sdk::Relay, |
| 147 | filters: Vec<nostr::Filter>, | 159 | filters: Vec<nostr::Filter>, |
| 148 | ) -> Result<Vec<Event>> { | 160 | ) -> Result<Vec<Event>> { |
| 149 | if !relay.is_connected().await { | 161 | println!("fetching from {}", relay.url()); |
| 150 | relay.connect(true).await; | 162 | |
| 151 | } | 163 | let events = relay |
| 152 | relay | ||
| 153 | .get_events_of( | 164 | .get_events_of( |
| 154 | filters, | 165 | filters, |
| 155 | // 20 is nostr_sdk default | 166 | // 20 is nostr_sdk default |
| 156 | std::time::Duration::from_secs(20), | 167 | std::time::Duration::from_secs(10), |
| 157 | nostr_sdk::FilterOptions::ExitOnEOSE, | 168 | nostr_sdk::FilterOptions::ExitOnEOSE, |
| 158 | ) | 169 | ) |
| 159 | .await | 170 | .await |
| 160 | .context("failed to get events from relay") | 171 | .context("failed to get events from relay")?; |
| 172 | println!("fetched {} events from {}", events.len(), relay.url()); | ||
| 173 | Ok(events) | ||
| 161 | } | 174 | } |
| 162 | 175 | ||
| 163 | #[derive(Default)] | 176 | #[derive(Default)] |