diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:34:28 +0000 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2025-12-10 12:34:28 +0000 |
| commit | b32772848f7325d4b3e1e15b05c5163df0b9671b (patch) | |
| tree | 50a47f996b0b60ba191eee1ea4faeb1b1b83ecaf /src/sync/mod.rs | |
| parent | 5f9d3ca0db4ffc9088be0b9ac9558efe1a4da810 (diff) | |
sync: implement filter consolidation system
Diffstat (limited to 'src/sync/mod.rs')
| -rw-r--r-- | src/sync/mod.rs | 169 |
1 files changed, 167 insertions, 2 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f704ca7..b5133e6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -346,6 +346,12 @@ pub struct ConnectNotification { | |||
| 346 | /// Quick reconnect window in seconds (15 minutes) | 346 | /// Quick reconnect window in seconds (15 minutes) |
| 347 | const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60; | 347 | const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60; |
| 348 | 348 | ||
| 349 | /// Maximum filter count before triggering consolidation | ||
| 350 | const CONSOLIDATION_THRESHOLD: usize = 70; | ||
| 351 | |||
| 352 | /// Maximum time to wait for pending batches (30 seconds) | ||
| 353 | const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30; | ||
| 354 | |||
| 349 | /// Manages proactive synchronization with external relays | 355 | /// Manages proactive synchronization with external relays |
| 350 | /// | 356 | /// |
| 351 | /// The SyncManager runs as a background task, subscribing to repository | 357 | /// The SyncManager runs as a background task, subscribing to repository |
| @@ -708,8 +714,8 @@ impl SyncManager { | |||
| 708 | } | 714 | } |
| 709 | } | 715 | } |
| 710 | 716 | ||
| 711 | // Step 2: Check if consolidation is needed (Phase 6 will implement maybe_consolidate) | 717 | // Step 2: Check if consolidation is needed BEFORE adding new filters |
| 712 | // self.maybe_consolidate(&action.relay_url, action.filters.len()); | 718 | self.maybe_consolidate(&action.relay_url, action.filters.len()).await; |
| 713 | 719 | ||
| 714 | // Step 3: Get connection and subscribe to all filters | 720 | // Step 3: Get connection and subscribe to all filters |
| 715 | let connection = match self.connections.get(&action.relay_url) { | 721 | let connection = match self.connections.get(&action.relay_url) { |
| @@ -1270,4 +1276,163 @@ impl SyncManager { | |||
| 1270 | } | 1276 | } |
| 1271 | } | 1277 | } |
| 1272 | } | 1278 | } |
| 1279 | |||
| 1280 | // ========================================================================= | ||
| 1281 | // Consolidation System (Phase 6) | ||
| 1282 | // ========================================================================= | ||
| 1283 | |||
| 1284 | /// Get the current filter count for a relay | ||
| 1285 | /// | ||
| 1286 | /// Counts all outstanding subscriptions in pending batches for this relay. | ||
| 1287 | /// This is used to determine if consolidation is needed. | ||
| 1288 | async fn get_filter_count(&self, relay_url: &str) -> usize { | ||
| 1289 | let pending = self.pending_sync_index.read().await; | ||
| 1290 | |||
| 1291 | let count = match pending.get(relay_url) { | ||
| 1292 | Some(batches) => { | ||
| 1293 | batches.iter().map(|b| b.outstanding_subs.len()).sum() | ||
| 1294 | } | ||
| 1295 | None => 0, | ||
| 1296 | }; | ||
| 1297 | |||
| 1298 | tracing::debug!( | ||
| 1299 | relay = %relay_url, | ||
| 1300 | filter_count = count, | ||
| 1301 | "Counted active filters for relay" | ||
| 1302 | ); | ||
| 1303 | |||
| 1304 | count | ||
| 1305 | } | ||
| 1306 | |||
| 1307 | /// Wait until all pending batches for a relay are complete | ||
| 1308 | /// | ||
| 1309 | /// Polls the pending_sync_index until the relay has no pending batches. | ||
| 1310 | /// Returns error if timeout (30 seconds) is exceeded. | ||
| 1311 | async fn wait_pending_complete(&self, relay_url: &str) -> Result<(), String> { | ||
| 1312 | use std::time::Duration; | ||
| 1313 | use tokio::time::{sleep, Instant}; | ||
| 1314 | |||
| 1315 | let start = Instant::now(); | ||
| 1316 | let timeout = Duration::from_secs(CONSOLIDATION_WAIT_TIMEOUT_SECS); | ||
| 1317 | |||
| 1318 | tracing::debug!( | ||
| 1319 | relay = %relay_url, | ||
| 1320 | timeout_secs = CONSOLIDATION_WAIT_TIMEOUT_SECS, | ||
| 1321 | "Waiting for pending batches to complete" | ||
| 1322 | ); | ||
| 1323 | |||
| 1324 | loop { | ||
| 1325 | // Check if no pending batches | ||
| 1326 | { | ||
| 1327 | let pending = self.pending_sync_index.read().await; | ||
| 1328 | if !pending.contains_key(relay_url) { | ||
| 1329 | tracing::debug!( | ||
| 1330 | relay = %relay_url, | ||
| 1331 | elapsed_ms = start.elapsed().as_millis(), | ||
| 1332 | "All pending batches complete" | ||
| 1333 | ); | ||
| 1334 | return Ok(()); | ||
| 1335 | } | ||
| 1336 | } | ||
| 1337 | |||
| 1338 | // Check timeout | ||
| 1339 | if start.elapsed() > timeout { | ||
| 1340 | tracing::warn!( | ||
| 1341 | relay = %relay_url, | ||
| 1342 | timeout_secs = CONSOLIDATION_WAIT_TIMEOUT_SECS, | ||
| 1343 | "Timeout waiting for pending batches" | ||
| 1344 | ); | ||
| 1345 | return Err(format!( | ||
| 1346 | "Timeout waiting for pending batches on {} after {}s", | ||
| 1347 | relay_url, CONSOLIDATION_WAIT_TIMEOUT_SECS | ||
| 1348 | )); | ||
| 1349 | } | ||
| 1350 | |||
| 1351 | // Short poll interval | ||
| 1352 | sleep(Duration::from_millis(100)).await; | ||
| 1353 | } | ||
| 1354 | } | ||
| 1355 | |||
| 1356 | /// Check if consolidation is needed and trigger if threshold exceeded | ||
| 1357 | /// | ||
| 1358 | /// Compares current filter count + new filter count against the threshold. | ||
| 1359 | /// If exceeded, triggers consolidation before adding new filters. | ||
| 1360 | async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) { | ||
| 1361 | let current_count = self.get_filter_count(relay_url).await; | ||
| 1362 | |||
| 1363 | if current_count + new_count > CONSOLIDATION_THRESHOLD { | ||
| 1364 | tracing::info!( | ||
| 1365 | relay = %relay_url, | ||
| 1366 | current_count = current_count, | ||
| 1367 | new_count = new_count, | ||
| 1368 | threshold = CONSOLIDATION_THRESHOLD, | ||
| 1369 | "Filter count exceeds threshold, consolidating" | ||
| 1370 | ); | ||
| 1371 | |||
| 1372 | if let Err(e) = self.consolidate(relay_url).await { | ||
| 1373 | tracing::error!( | ||
| 1374 | relay = %relay_url, | ||
| 1375 | error = %e, | ||
| 1376 | "Consolidation failed" | ||
| 1377 | ); | ||
| 1378 | } | ||
| 1379 | } | ||
| 1380 | } | ||
| 1381 | |||
| 1382 | /// Consolidate all subscriptions for a relay | ||
| 1383 | /// | ||
| 1384 | /// This method: | ||
| 1385 | /// 1. Waits for all pending batches to complete | ||
| 1386 | /// 2. Unsubscribes from all active subscriptions | ||
| 1387 | /// 3. Rebuilds Layer 2 and Layer 3 with since filter | ||
| 1388 | /// | ||
| 1389 | /// Layer 1 (announcements) remains active and is NOT unsubscribed. | ||
| 1390 | async fn consolidate(&mut self, relay_url: &str) -> Result<(), String> { | ||
| 1391 | tracing::info!( | ||
| 1392 | relay = %relay_url, | ||
| 1393 | "Starting consolidation" | ||
| 1394 | ); | ||
| 1395 | |||
| 1396 | // Step 1: Wait for all pending batches to complete | ||
| 1397 | self.wait_pending_complete(relay_url).await?; | ||
| 1398 | |||
| 1399 | // Step 2: Get connection and unsubscribe all | ||
| 1400 | let connection = match self.connections.get(relay_url) { | ||
| 1401 | Some(conn) => conn, | ||
| 1402 | None => { | ||
| 1403 | tracing::debug!( | ||
| 1404 | relay = %relay_url, | ||
| 1405 | "No connection found, skipping consolidation" | ||
| 1406 | ); | ||
| 1407 | return Ok(()); // No connection, nothing to consolidate | ||
| 1408 | } | ||
| 1409 | }; | ||
| 1410 | |||
| 1411 | connection.unsubscribe_all().await; | ||
| 1412 | |||
| 1413 | // Step 3: Rebuild all subscriptions with since filter | ||
| 1414 | let now = Timestamp::now(); | ||
| 1415 | let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS)); | ||
| 1416 | |||
| 1417 | // Re-subscribe to Layer 1 with since filter | ||
| 1418 | let layer1_filter = filters::build_announcement_filter(Some(since)); | ||
| 1419 | if let Err(e) = connection.subscribe_filter(layer1_filter).await { | ||
| 1420 | tracing::error!( | ||
| 1421 | relay = %relay_url, | ||
| 1422 | error = %e, | ||
| 1423 | "Failed to re-subscribe to Layer 1 during consolidation" | ||
| 1424 | ); | ||
| 1425 | } | ||
| 1426 | |||
| 1427 | // Rebuild Layer 2 and Layer 3 with since filter | ||
| 1428 | self.rebuild_layer2_and_layer3(relay_url, Some(since)).await; | ||
| 1429 | |||
| 1430 | tracing::info!( | ||
| 1431 | relay = %relay_url, | ||
| 1432 | since = %since, | ||
| 1433 | "Consolidation complete - filter count reset" | ||
| 1434 | ); | ||
| 1435 | |||
| 1436 | Ok(()) | ||
| 1437 | } | ||
| 1273 | } \ No newline at end of file | 1438 | } \ No newline at end of file |