upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/sync/mod.rs
diff options
context:
space:
mode:
authorDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:34:28 +0000
committerDanConwayDev <DanConwayDev@protonmail.com>2025-12-10 12:34:28 +0000
commitb32772848f7325d4b3e1e15b05c5163df0b9671b (patch)
tree50a47f996b0b60ba191eee1ea4faeb1b1b83ecaf /src/sync/mod.rs
parent5f9d3ca0db4ffc9088be0b9ac9558efe1a4da810 (diff)
sync: implement filter consolidation system
Diffstat (limited to 'src/sync/mod.rs')
-rw-r--r--src/sync/mod.rs169
1 files changed, 167 insertions, 2 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index f704ca7..b5133e6 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -346,6 +346,12 @@ pub struct ConnectNotification {
346/// Quick reconnect window in seconds (15 minutes) 346/// Quick reconnect window in seconds (15 minutes)
347const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60; 347const QUICK_RECONNECT_WINDOW_SECS: u64 = 15 * 60;
348 348
349/// Maximum filter count before triggering consolidation
350const CONSOLIDATION_THRESHOLD: usize = 70;
351
352/// Maximum time to wait for pending batches (30 seconds)
353const CONSOLIDATION_WAIT_TIMEOUT_SECS: u64 = 30;
354
349/// Manages proactive synchronization with external relays 355/// Manages proactive synchronization with external relays
350/// 356///
351/// The SyncManager runs as a background task, subscribing to repository 357/// The SyncManager runs as a background task, subscribing to repository
@@ -708,8 +714,8 @@ impl SyncManager {
708 } 714 }
709 } 715 }
710 716
711 // Step 2: Check if consolidation is needed (Phase 6 will implement maybe_consolidate) 717 // Step 2: Check if consolidation is needed BEFORE adding new filters
712 // self.maybe_consolidate(&action.relay_url, action.filters.len()); 718 self.maybe_consolidate(&action.relay_url, action.filters.len()).await;
713 719
714 // Step 3: Get connection and subscribe to all filters 720 // Step 3: Get connection and subscribe to all filters
715 let connection = match self.connections.get(&action.relay_url) { 721 let connection = match self.connections.get(&action.relay_url) {
@@ -1270,4 +1276,163 @@ impl SyncManager {
1270 } 1276 }
1271 } 1277 }
1272 } 1278 }
1279
1280 // =========================================================================
1281 // Consolidation System (Phase 6)
1282 // =========================================================================
1283
1284 /// Get the current filter count for a relay
1285 ///
1286 /// Counts all outstanding subscriptions in pending batches for this relay.
1287 /// This is used to determine if consolidation is needed.
1288 async fn get_filter_count(&self, relay_url: &str) -> usize {
1289 let pending = self.pending_sync_index.read().await;
1290
1291 let count = match pending.get(relay_url) {
1292 Some(batches) => {
1293 batches.iter().map(|b| b.outstanding_subs.len()).sum()
1294 }
1295 None => 0,
1296 };
1297
1298 tracing::debug!(
1299 relay = %relay_url,
1300 filter_count = count,
1301 "Counted active filters for relay"
1302 );
1303
1304 count
1305 }
1306
1307 /// Wait until all pending batches for a relay are complete
1308 ///
1309 /// Polls the pending_sync_index until the relay has no pending batches.
1310 /// Returns error if timeout (30 seconds) is exceeded.
1311 async fn wait_pending_complete(&self, relay_url: &str) -> Result<(), String> {
1312 use std::time::Duration;
1313 use tokio::time::{sleep, Instant};
1314
1315 let start = Instant::now();
1316 let timeout = Duration::from_secs(CONSOLIDATION_WAIT_TIMEOUT_SECS);
1317
1318 tracing::debug!(
1319 relay = %relay_url,
1320 timeout_secs = CONSOLIDATION_WAIT_TIMEOUT_SECS,
1321 "Waiting for pending batches to complete"
1322 );
1323
1324 loop {
1325 // Check if no pending batches
1326 {
1327 let pending = self.pending_sync_index.read().await;
1328 if !pending.contains_key(relay_url) {
1329 tracing::debug!(
1330 relay = %relay_url,
1331 elapsed_ms = start.elapsed().as_millis(),
1332 "All pending batches complete"
1333 );
1334 return Ok(());
1335 }
1336 }
1337
1338 // Check timeout
1339 if start.elapsed() > timeout {
1340 tracing::warn!(
1341 relay = %relay_url,
1342 timeout_secs = CONSOLIDATION_WAIT_TIMEOUT_SECS,
1343 "Timeout waiting for pending batches"
1344 );
1345 return Err(format!(
1346 "Timeout waiting for pending batches on {} after {}s",
1347 relay_url, CONSOLIDATION_WAIT_TIMEOUT_SECS
1348 ));
1349 }
1350
1351 // Short poll interval
1352 sleep(Duration::from_millis(100)).await;
1353 }
1354 }
1355
1356 /// Check if consolidation is needed and trigger if threshold exceeded
1357 ///
1358 /// Compares current filter count + new filter count against the threshold.
1359 /// If exceeded, triggers consolidation before adding new filters.
1360 async fn maybe_consolidate(&mut self, relay_url: &str, new_count: usize) {
1361 let current_count = self.get_filter_count(relay_url).await;
1362
1363 if current_count + new_count > CONSOLIDATION_THRESHOLD {
1364 tracing::info!(
1365 relay = %relay_url,
1366 current_count = current_count,
1367 new_count = new_count,
1368 threshold = CONSOLIDATION_THRESHOLD,
1369 "Filter count exceeds threshold, consolidating"
1370 );
1371
1372 if let Err(e) = self.consolidate(relay_url).await {
1373 tracing::error!(
1374 relay = %relay_url,
1375 error = %e,
1376 "Consolidation failed"
1377 );
1378 }
1379 }
1380 }
1381
1382 /// Consolidate all subscriptions for a relay
1383 ///
1384 /// This method:
1385 /// 1. Waits for all pending batches to complete
1386 /// 2. Unsubscribes from all active subscriptions
1387 /// 3. Rebuilds Layer 2 and Layer 3 with since filter
1388 ///
1389 /// Layer 1 (announcements) remains active and is NOT unsubscribed.
1390 async fn consolidate(&mut self, relay_url: &str) -> Result<(), String> {
1391 tracing::info!(
1392 relay = %relay_url,
1393 "Starting consolidation"
1394 );
1395
1396 // Step 1: Wait for all pending batches to complete
1397 self.wait_pending_complete(relay_url).await?;
1398
1399 // Step 2: Get connection and unsubscribe all
1400 let connection = match self.connections.get(relay_url) {
1401 Some(conn) => conn,
1402 None => {
1403 tracing::debug!(
1404 relay = %relay_url,
1405 "No connection found, skipping consolidation"
1406 );
1407 return Ok(()); // No connection, nothing to consolidate
1408 }
1409 };
1410
1411 connection.unsubscribe_all().await;
1412
1413 // Step 3: Rebuild all subscriptions with since filter
1414 let now = Timestamp::now();
1415 let since = Timestamp::from(now.as_secs().saturating_sub(QUICK_RECONNECT_WINDOW_SECS));
1416
1417 // Re-subscribe to Layer 1 with since filter
1418 let layer1_filter = filters::build_announcement_filter(Some(since));
1419 if let Err(e) = connection.subscribe_filter(layer1_filter).await {
1420 tracing::error!(
1421 relay = %relay_url,
1422 error = %e,
1423 "Failed to re-subscribe to Layer 1 during consolidation"
1424 );
1425 }
1426
1427 // Rebuild Layer 2 and Layer 3 with since filter
1428 self.rebuild_layer2_and_layer3(relay_url, Some(since)).await;
1429
1430 tracing::info!(
1431 relay = %relay_url,
1432 since = %since,
1433 "Consolidation complete - filter count reset"
1434 );
1435
1436 Ok(())
1437 }
1273} \ No newline at end of file 1438} \ No newline at end of file