diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 15:20:59 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2026-02-23 15:20:59 +0000 |
| commit | 113928aa84894ea8f65c247d9987527e792b32a9 (patch) | |
| tree | ec967d6195d9f7ec4f061449596611afe3a0950f /src/sync/self_subscriber.rs | |
| parent | 26f608e5011b9d1ad6036da75b89272835e69695 (diff) | |
| parent | e0ad39a489b3398f8208713bf728db0cb11475b0 (diff) | |
Merge master into 3ca0-announcements-purgatory
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 142 |
1 files changed, 119 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index ab10c49..4d69c9a 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds, SyncLevel}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -99,6 +101,8 @@ pub struct SelfSubscriber { | |||
| 99 | action_tx: mpsc::Sender<AddFilters>, | 101 | action_tx: mpsc::Sender<AddFilters>, |
| 100 | /// Last time we connected - used for since filter on reconnect | 102 | /// Last time we connected - used for since filter on reconnect |
| 101 | last_connected: Option<Timestamp>, | 103 | last_connected: Option<Timestamp>, |
| 104 | /// Database for querying existing events on startup | ||
| 105 | database: SharedDatabase, | ||
| 102 | } | 106 | } |
| 103 | 107 | ||
| 104 | impl SelfSubscriber { | 108 | impl SelfSubscriber { |
| @@ -109,11 +113,13 @@ impl SelfSubscriber { | |||
| 109 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 113 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 110 | /// * `repo_sync_index` - Shared index to update with discovered repos | 114 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 111 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 115 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 116 | /// * `database` - Database for querying existing events on startup | ||
| 112 | pub fn new( | 117 | pub fn new( |
| 113 | own_relay_url: String, | 118 | own_relay_url: String, |
| 114 | relay_domain: String, | 119 | relay_domain: String, |
| 115 | repo_sync_index: RepoSyncIndex, | 120 | repo_sync_index: RepoSyncIndex, |
| 116 | action_tx: mpsc::Sender<AddFilters>, | 121 | action_tx: mpsc::Sender<AddFilters>, |
| 122 | database: SharedDatabase, | ||
| 117 | ) -> Self { | 123 | ) -> Self { |
| 118 | Self { | 124 | Self { |
| 119 | own_relay_url, | 125 | own_relay_url, |
| @@ -121,6 +127,7 @@ impl SelfSubscriber { | |||
| 121 | repo_sync_index, | 127 | repo_sync_index, |
| 122 | action_tx, | 128 | action_tx, |
| 123 | last_connected: None, | 129 | last_connected: None, |
| 130 | database, | ||
| 124 | } | 131 | } |
| 125 | } | 132 | } |
| 126 | 133 | ||
| @@ -136,6 +143,102 @@ impl SelfSubscriber { | |||
| 136 | } | 143 | } |
| 137 | } | 144 | } |
| 138 | 145 | ||
| 146 | /// Load existing events from database on startup | ||
| 147 | /// | ||
| 148 | /// Queries the database with two separate queries to build the initial | ||
| 149 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 150 | /// created, not just those returned by the WebSocket subscription | ||
| 151 | /// (which has limits on the number of events returned). | ||
| 152 | /// | ||
| 153 | /// Query order: | ||
| 154 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 155 | /// with repos and their relays | ||
| 156 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 157 | /// to add root event IDs for Layer 3 filter creation | ||
| 158 | /// | ||
| 159 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 160 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 161 | let mut pending = PendingUpdates::new(); | ||
| 162 | |||
| 163 | tracing::info!("Loading all events from database"); | ||
| 164 | |||
| 165 | // First query: Get all announcements to populate repo_sync_index | ||
| 166 | let announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 167 | |||
| 168 | let announcements = match self.database.query(announcement_filter).await { | ||
| 169 | Ok(events) => { | ||
| 170 | tracing::info!(count = events.len(), "Loaded announcements from database"); | ||
| 171 | events | ||
| 172 | } | ||
| 173 | Err(e) => { | ||
| 174 | tracing::error!( | ||
| 175 | error = %e, | ||
| 176 | "Failed to query announcements from database" | ||
| 177 | ); | ||
| 178 | return pending; | ||
| 179 | } | ||
| 180 | }; | ||
| 181 | |||
| 182 | // Process announcements | ||
| 183 | let mut announcements_loaded = 0; | ||
| 184 | for event in announcements.iter() { | ||
| 185 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 186 | let relays = Self::extract_relay_urls(event); | ||
| 187 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 188 | announcements_loaded += 1; | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 193 | { | ||
| 194 | let mut index = self.repo_sync_index.write().await; | ||
| 195 | for (repo_id, needs) in &pending.repos { | ||
| 196 | let entry = index | ||
| 197 | .entry(repo_id.clone()) | ||
| 198 | .or_insert_with(|| RepoSyncNeeds { | ||
| 199 | relays: HashSet::new(), | ||
| 200 | root_events: HashSet::new(), | ||
| 201 | sync_level: SyncLevel::StateOnly, | ||
| 202 | }); | ||
| 203 | entry.relays.extend(needs.relays.clone()); | ||
| 204 | } | ||
| 205 | } | ||
| 206 | |||
| 207 | // Second query: Get all root events for handle_root_event() | ||
| 208 | let root_filter = | ||
| 209 | Filter::new().kinds(vec![Kind::GitPatch, Kind::GitIssue, Kind::GitPullRequest]); | ||
| 210 | |||
| 211 | let root_events = match self.database.query(root_filter).await { | ||
| 212 | Ok(events) => { | ||
| 213 | tracing::info!(count = events.len(), "Loaded root events from database"); | ||
| 214 | events | ||
| 215 | } | ||
| 216 | Err(e) => { | ||
| 217 | tracing::error!( | ||
| 218 | error = %e, | ||
| 219 | "Failed to query root events from database" | ||
| 220 | ); | ||
| 221 | // Continue with just announcements | ||
| 222 | return pending; | ||
| 223 | } | ||
| 224 | }; | ||
| 225 | |||
| 226 | // Process root events | ||
| 227 | let mut root_events_processed = 0; | ||
| 228 | for event in root_events.iter() { | ||
| 229 | self.handle_root_event(event, &mut pending).await; | ||
| 230 | root_events_processed += 1; | ||
| 231 | } | ||
| 232 | |||
| 233 | tracing::info!( | ||
| 234 | announcements_loaded = announcements_loaded, | ||
| 235 | root_events_processed = root_events_processed, | ||
| 236 | "Processed existing events from database" | ||
| 237 | ); | ||
| 238 | |||
| 239 | pending | ||
| 240 | } | ||
| 241 | |||
| 139 | /// Process a relay pool notification | 242 | /// Process a relay pool notification |
| 140 | /// | 243 | /// |
| 141 | /// Handles incoming events from the subscription, queueing 30617 announcements | 244 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -277,33 +380,22 @@ impl SelfSubscriber { | |||
| 277 | // Subscribe to announcement and root event kinds | 380 | // Subscribe to announcement and root event kinds |
| 278 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 381 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 279 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 382 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 280 | // Check if we have a last_connected time for reconnect filtering | 383 | let mut filter = Filter::new().kinds(vec![ |
| 281 | let filter = if let Some(last) = self.last_connected { | 384 | Kind::GitRepoAnnouncement, |
| 385 | Kind::GitPatch, | ||
| 386 | Kind::GitIssue, | ||
| 387 | Kind::GitPullRequest, | ||
| 388 | Kind::GitUserGraspList, | ||
| 389 | ]); | ||
| 390 | if let Some(timestamp) = self.last_connected { | ||
| 282 | // Quick reconnect - use since filter (15 min buffer) | 391 | // Quick reconnect - use since filter (15 min buffer) |
| 283 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 392 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 284 | tracing::debug!( | 393 | tracing::debug!( |
| 285 | since = %since, | 394 | since = %since, |
| 286 | "Using since filter for reconnect" | 395 | "Using since filter for reconnect" |
| 287 | ); | 396 | ); |
| 288 | Filter::new() | 397 | filter = filter.since(since); |
| 289 | .kinds(vec![ | 398 | } |
| 290 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 291 | Kind::GitPatch, // Patches | ||
| 292 | Kind::GitIssue, // Issues | ||
| 293 | Kind::GitPullRequest, // Pull Requests | ||
| 294 | Kind::GitUserGraspList, // User Grasp List | ||
| 295 | ]) | ||
| 296 | .since(since) | ||
| 297 | } else { | ||
| 298 | // First connection - no since filter | ||
| 299 | Filter::new().kinds(vec![ | ||
| 300 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 301 | Kind::GitPatch, // Patches | ||
| 302 | Kind::GitIssue, // Issues | ||
| 303 | Kind::GitPullRequest, // Pull Requests | ||
| 304 | Kind::GitUserGraspList, // User Grasp List | ||
| 305 | ]) | ||
| 306 | }; | ||
| 307 | 399 | ||
| 308 | // Update last_connected AFTER creating filter but BEFORE subscribing | 400 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 309 | self.last_connected = Some(Timestamp::now()); | 401 | self.last_connected = Some(Timestamp::now()); |
| @@ -324,7 +416,11 @@ impl SelfSubscriber { | |||
| 324 | 416 | ||
| 325 | let mut notifications = client.notifications(); | 417 | let mut notifications = client.notifications(); |
| 326 | let batch_window = Self::get_batch_window(); | 418 | let batch_window = Self::get_batch_window(); |
| 327 | let mut pending = PendingUpdates::new(); | 419 | |
| 420 | // Load existing events from database on startup | ||
| 421 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 422 | // returned by the WebSocket subscription (which has limits) | ||
| 423 | let mut pending = self.load_existing_events().await; | ||
| 328 | 424 | ||
| 329 | // Timer does NOT reset on new events - use interval | 425 | // Timer does NOT reset on new events - use interval |
| 330 | let mut timer = tokio::time::interval(batch_window); | 426 | let mut timer = tokio::time::interval(batch_window); |