diff options
Diffstat (limited to 'src/sync/self_subscriber.rs')
| -rw-r--r-- | src/sync/self_subscriber.rs | 167 |
1 files changed, 144 insertions, 23 deletions
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index 3cc408d..e9505f1 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -16,6 +16,8 @@ use nostr_sdk::Timestamp; | |||
| 16 | use tokio::sync::broadcast::error::RecvError; | 16 | use tokio::sync::broadcast::error::RecvError; |
| 17 | use tokio::sync::{broadcast, mpsc}; | 17 | use tokio::sync::{broadcast, mpsc}; |
| 18 | 18 | ||
| 19 | use crate::nostr::builder::SharedDatabase; | ||
| 20 | |||
| 19 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; | 21 | use super::{AddFilters, RepoSyncIndex, RepoSyncNeeds}; |
| 20 | 22 | ||
| 21 | // ============================================================================= | 23 | // ============================================================================= |
| @@ -98,6 +100,8 @@ pub struct SelfSubscriber { | |||
| 98 | action_tx: mpsc::Sender<AddFilters>, | 100 | action_tx: mpsc::Sender<AddFilters>, |
| 99 | /// Last time we connected - used for since filter on reconnect | 101 | /// Last time we connected - used for since filter on reconnect |
| 100 | last_connected: Option<Timestamp>, | 102 | last_connected: Option<Timestamp>, |
| 103 | /// Database for querying existing events on startup | ||
| 104 | database: SharedDatabase, | ||
| 101 | } | 105 | } |
| 102 | 106 | ||
| 103 | impl SelfSubscriber { | 107 | impl SelfSubscriber { |
| @@ -108,11 +112,13 @@ impl SelfSubscriber { | |||
| 108 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) | 112 | /// * `relay_domain` - Our service domain (used for filtering relevant repos) |
| 109 | /// * `repo_sync_index` - Shared index to update with discovered repos | 113 | /// * `repo_sync_index` - Shared index to update with discovered repos |
| 110 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager | 114 | /// * `action_tx` - Channel to send AddFilters actions to the SyncManager |
| 115 | /// * `database` - Database for querying existing events on startup | ||
| 111 | pub fn new( | 116 | pub fn new( |
| 112 | own_relay_url: String, | 117 | own_relay_url: String, |
| 113 | relay_domain: String, | 118 | relay_domain: String, |
| 114 | repo_sync_index: RepoSyncIndex, | 119 | repo_sync_index: RepoSyncIndex, |
| 115 | action_tx: mpsc::Sender<AddFilters>, | 120 | action_tx: mpsc::Sender<AddFilters>, |
| 121 | database: SharedDatabase, | ||
| 116 | ) -> Self { | 122 | ) -> Self { |
| 117 | Self { | 123 | Self { |
| 118 | own_relay_url, | 124 | own_relay_url, |
| @@ -120,6 +126,7 @@ impl SelfSubscriber { | |||
| 120 | repo_sync_index, | 126 | repo_sync_index, |
| 121 | action_tx, | 127 | action_tx, |
| 122 | last_connected: None, | 128 | last_connected: None, |
| 129 | database, | ||
| 123 | } | 130 | } |
| 124 | } | 131 | } |
| 125 | 132 | ||
| @@ -135,6 +142,127 @@ impl SelfSubscriber { | |||
| 135 | .unwrap_or(Duration::from_millis(5000)) | 142 | .unwrap_or(Duration::from_millis(5000)) |
| 136 | } | 143 | } |
| 137 | 144 | ||
| 145 | /// Load existing events from database on startup | ||
| 146 | /// | ||
| 147 | /// Queries the database with two separate queries to build the initial | ||
| 148 | /// PendingUpdates state. This ensures all repos get Layer 2/3 filters | ||
| 149 | /// created, not just those returned by the WebSocket subscription | ||
| 150 | /// (which has limits on the number of events returned). | ||
| 151 | /// | ||
| 152 | /// Query order: | ||
| 153 | /// 1. First query: Get announcements (30617) to populate repo_sync_index | ||
| 154 | /// with repos and their relays | ||
| 155 | /// 2. Second query: Get root events (1617/1618/1621) for handle_root_event() | ||
| 156 | /// to add root event IDs for Layer 3 filter creation | ||
| 157 | /// | ||
| 158 | /// Both queries use `.since(last_connected)` if available for incremental | ||
| 159 | /// loading on reconnect. | ||
| 160 | /// | ||
| 161 | /// Returns a PendingUpdates containing all repos that need Layer 2/3 filters. | ||
| 162 | async fn load_existing_events(&self) -> PendingUpdates { | ||
| 163 | let mut pending = PendingUpdates::new(); | ||
| 164 | |||
| 165 | // Log whether this is a full or incremental load | ||
| 166 | if let Some(since) = self.last_connected { | ||
| 167 | tracing::info!( | ||
| 168 | since = %since, | ||
| 169 | "Loading events incrementally from database (reconnect)" | ||
| 170 | ); | ||
| 171 | } else { | ||
| 172 | tracing::info!("Loading all events from database (first connection)"); | ||
| 173 | } | ||
| 174 | |||
| 175 | // First query: Get announcements to populate repo_sync_index | ||
| 176 | let mut announcement_filter = Filter::new().kind(Kind::GitRepoAnnouncement); | ||
| 177 | if let Some(timestamp) = self.last_connected { | ||
| 178 | announcement_filter = announcement_filter.since(timestamp); | ||
| 179 | } | ||
| 180 | |||
| 181 | let announcements = match self.database.query(announcement_filter).await { | ||
| 182 | Ok(events) => { | ||
| 183 | tracing::info!( | ||
| 184 | count = events.len(), | ||
| 185 | "Loaded announcements from database" | ||
| 186 | ); | ||
| 187 | events | ||
| 188 | } | ||
| 189 | Err(e) => { | ||
| 190 | tracing::error!( | ||
| 191 | error = %e, | ||
| 192 | "Failed to query announcements from database" | ||
| 193 | ); | ||
| 194 | return pending; | ||
| 195 | } | ||
| 196 | }; | ||
| 197 | |||
| 198 | // Process announcements | ||
| 199 | let mut announcements_loaded = 0; | ||
| 200 | for event in announcements.iter() { | ||
| 201 | if let Some(repo_id) = Self::extract_repo_id(event) { | ||
| 202 | let relays = Self::extract_relay_urls(event); | ||
| 203 | pending.add_repo(repo_id, relays, HashSet::new()); | ||
| 204 | announcements_loaded += 1; | ||
| 205 | } | ||
| 206 | } | ||
| 207 | |||
| 208 | // Update repo_sync_index with announcements BEFORE querying root events | ||
| 209 | { | ||
| 210 | let mut index = self.repo_sync_index.write().await; | ||
| 211 | for (repo_id, needs) in &pending.repos { | ||
| 212 | let entry = index | ||
| 213 | .entry(repo_id.clone()) | ||
| 214 | .or_insert_with(|| RepoSyncNeeds { | ||
| 215 | relays: HashSet::new(), | ||
| 216 | root_events: HashSet::new(), | ||
| 217 | }); | ||
| 218 | entry.relays.extend(needs.relays.clone()); | ||
| 219 | } | ||
| 220 | } | ||
| 221 | |||
| 222 | // Second query: Get root events for handle_root_event() | ||
| 223 | let mut root_filter = Filter::new().kinds(vec![ | ||
| 224 | Kind::GitPatch, | ||
| 225 | Kind::GitIssue, | ||
| 226 | Kind::GitPullRequest, | ||
| 227 | ]); | ||
| 228 | if let Some(timestamp) = self.last_connected { | ||
| 229 | root_filter = root_filter.since(timestamp); | ||
| 230 | } | ||
| 231 | |||
| 232 | let root_events = match self.database.query(root_filter).await { | ||
| 233 | Ok(events) => { | ||
| 234 | tracing::info!( | ||
| 235 | count = events.len(), | ||
| 236 | "Loaded root events from database" | ||
| 237 | ); | ||
| 238 | events | ||
| 239 | } | ||
| 240 | Err(e) => { | ||
| 241 | tracing::error!( | ||
| 242 | error = %e, | ||
| 243 | "Failed to query root events from database" | ||
| 244 | ); | ||
| 245 | // Continue with just announcements | ||
| 246 | return pending; | ||
| 247 | } | ||
| 248 | }; | ||
| 249 | |||
| 250 | // Process root events | ||
| 251 | let mut root_events_processed = 0; | ||
| 252 | for event in root_events.iter() { | ||
| 253 | self.handle_root_event(event, &mut pending).await; | ||
| 254 | root_events_processed += 1; | ||
| 255 | } | ||
| 256 | |||
| 257 | tracing::info!( | ||
| 258 | announcements_loaded = announcements_loaded, | ||
| 259 | root_events_processed = root_events_processed, | ||
| 260 | "Processed existing events from database" | ||
| 261 | ); | ||
| 262 | |||
| 263 | pending | ||
| 264 | } | ||
| 265 | |||
| 138 | /// Process a relay pool notification | 266 | /// Process a relay pool notification |
| 139 | /// | 267 | /// |
| 140 | /// Handles incoming events from the subscription, queueing 30617 announcements | 268 | /// Handles incoming events from the subscription, queueing 30617 announcements |
| @@ -276,33 +404,22 @@ impl SelfSubscriber { | |||
| 276 | // Subscribe to announcement and root event kinds | 404 | // Subscribe to announcement and root event kinds |
| 277 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) | 405 | // Per v4 spec: 30617, 1617, 1618, 1621 (NOT 30618) |
| 278 | // Plus kind 10317 (User Grasp List) for GRASP discovery | 406 | // Plus kind 10317 (User Grasp List) for GRASP discovery |
| 279 | // Check if we have a last_connected time for reconnect filtering | 407 | let mut filter = Filter::new().kinds(vec![ |
| 280 | let filter = if let Some(last) = self.last_connected { | 408 | Kind::GitRepoAnnouncement, |
| 409 | Kind::GitPatch, | ||
| 410 | Kind::GitIssue, | ||
| 411 | Kind::GitPullRequest, | ||
| 412 | Kind::GitUserGraspList, | ||
| 413 | ]); | ||
| 414 | if let Some(timestamp) = self.last_connected { | ||
| 281 | // Quick reconnect - use since filter (15 min buffer) | 415 | // Quick reconnect - use since filter (15 min buffer) |
| 282 | let since = Timestamp::from(last.as_secs().saturating_sub(15 * 60)); | 416 | let since = Timestamp::from(timestamp.as_secs().saturating_sub(15 * 60)); |
| 283 | tracing::debug!( | 417 | tracing::debug!( |
| 284 | since = %since, | 418 | since = %since, |
| 285 | "Using since filter for reconnect" | 419 | "Using since filter for reconnect" |
| 286 | ); | 420 | ); |
| 287 | Filter::new() | 421 | filter = filter.since(since); |
| 288 | .kinds(vec![ | 422 | } |
| 289 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 290 | Kind::GitPatch, // Patches | ||
| 291 | Kind::GitIssue, // Issues | ||
| 292 | Kind::GitPullRequest, // Pull Requests | ||
| 293 | Kind::GitUserGraspList, // User Grasp List | ||
| 294 | ]) | ||
| 295 | .since(since) | ||
| 296 | } else { | ||
| 297 | // First connection - no since filter | ||
| 298 | Filter::new().kinds(vec![ | ||
| 299 | Kind::GitRepoAnnouncement, // Repository Announcements | ||
| 300 | Kind::GitPatch, // Patches | ||
| 301 | Kind::GitIssue, // Issues | ||
| 302 | Kind::GitPullRequest, // Pull Requests | ||
| 303 | Kind::GitUserGraspList, // User Grasp List | ||
| 304 | ]) | ||
| 305 | }; | ||
| 306 | 423 | ||
| 307 | // Update last_connected AFTER creating filter but BEFORE subscribing | 424 | // Update last_connected AFTER creating filter but BEFORE subscribing |
| 308 | self.last_connected = Some(Timestamp::now()); | 425 | self.last_connected = Some(Timestamp::now()); |
| @@ -323,7 +440,11 @@ impl SelfSubscriber { | |||
| 323 | 440 | ||
| 324 | let mut notifications = client.notifications(); | 441 | let mut notifications = client.notifications(); |
| 325 | let batch_window = Self::get_batch_window(); | 442 | let batch_window = Self::get_batch_window(); |
| 326 | let mut pending = PendingUpdates::new(); | 443 | |
| 444 | // Load existing events from database on startup | ||
| 445 | // This ensures all repos get Layer 2/3 filters created, not just those | ||
| 446 | // returned by the WebSocket subscription (which has limits) | ||
| 447 | let mut pending = self.load_existing_events().await; | ||
| 327 | 448 | ||
| 328 | // Timer does NOT reset on new events - use interval | 449 | // Timer does NOT reset on new events - use interval |
| 329 | let mut timer = tokio::time::interval(batch_window); | 450 | let mut timer = tokio::time::interval(batch_window); |