diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 7 | ||||
| -rw-r--r-- | src/nostr/builder.rs | 54 | ||||
| -rw-r--r-- | src/nostr/policy/mod.rs | 19 | ||||
| -rw-r--r-- | src/sync/mod.rs | 66 | ||||
| -rw-r--r-- | src/sync/self_subscriber.rs | 4 |
5 files changed, 147 insertions, 3 deletions
diff --git a/src/main.rs b/src/main.rs index ab6ede7..ebe05a3 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -132,6 +132,13 @@ async fn main() -> Result<()> { | |||
| 132 | // Get a reference to the rejected events index for shutdown persistence | 132 | // Get a reference to the rejected events index for shutdown persistence |
| 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); | 133 | let shutdown_rejected_index = sync_manager.rejected_events_index(); |
| 134 | 134 | ||
| 135 | // Wire repo_sync_index into write policy so user-submitted purgatory announcements | ||
| 136 | // get registered for state event sync immediately (Fix 3). | ||
| 137 | let repo_sync_index = sync_manager.repo_sync_index(); | ||
| 138 | relay_with_db | ||
| 139 | .write_policy | ||
| 140 | .set_repo_sync_index(repo_sync_index); | ||
| 141 | |||
| 135 | tokio::spawn(async move { | 142 | tokio::spawn(async move { |
| 136 | sync_manager.run().await; | 143 | sync_manager.run().await; |
| 137 | }); | 144 | }); |
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs index aff12a6..8d1e461 100644 --- a/src/nostr/builder.rs +++ b/src/nostr/builder.rs | |||
| @@ -17,6 +17,7 @@ use crate::nostr::policy::{ | |||
| 17 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, | 17 | AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, |
| 18 | RelatedEventPolicy, StatePolicy, StateResult, | 18 | RelatedEventPolicy, StatePolicy, StateResult, |
| 19 | }; | 19 | }; |
| 20 | use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel}; | ||
| 20 | 21 | ||
| 21 | /// Type alias for the shared database used by the relay | 22 | /// Type alias for the shared database used by the relay |
| 22 | pub type SharedDatabase = Arc<dyn NostrDatabase>; | 23 | pub type SharedDatabase = Arc<dyn NostrDatabase>; |
| @@ -98,6 +99,14 @@ impl Nip34WritePolicy { | |||
| 98 | self.ctx.set_local_relay(relay); | 99 | self.ctx.set_local_relay(relay); |
| 99 | } | 100 | } |
| 100 | 101 | ||
| 102 | /// Set the repo sync index so that user-submitted purgatory announcements can | ||
| 103 | /// be registered for state event sync immediately. | ||
| 104 | /// | ||
| 105 | /// This must be called after SyncManager is created. | ||
| 106 | pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { | ||
| 107 | self.ctx.set_repo_sync_index(index); | ||
| 108 | } | ||
| 109 | |||
| 101 | /// Handle repository announcement event | 110 | /// Handle repository announcement event |
| 102 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { | 111 | async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { |
| 103 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); | 112 | let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); |
| @@ -146,6 +155,51 @@ impl Nip34WritePolicy { | |||
| 146 | "Accepted announcement to purgatory: {} (waiting for git data)", | 155 | "Accepted announcement to purgatory: {} (waiting for git data)", |
| 147 | event_id_str | 156 | event_id_str |
| 148 | ); | 157 | ); |
| 158 | |||
| 159 | // Register repo in repo_sync_index with StateOnly level so that | ||
| 160 | // state event sync starts promptly via the next batch EOSE recompute. | ||
| 161 | // This handles user-submitted purgatory announcements - the SelfSubscriber | ||
| 162 | // only sees DB events, so it won't pick these up automatically. | ||
| 163 | if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() { | ||
| 164 | if let Ok(announcement) = | ||
| 165 | RepositoryAnnouncement::from_event(event.clone()) | ||
| 166 | { | ||
| 167 | use std::collections::HashSet; | ||
| 168 | let repo_id = format!( | ||
| 169 | "30617:{}:{}", | ||
| 170 | event.pubkey, | ||
| 171 | announcement.identifier | ||
| 172 | ); | ||
| 173 | |||
| 174 | // Extract relay URLs from the announcement event tags | ||
| 175 | let relays: HashSet<String> = event | ||
| 176 | .tags | ||
| 177 | .iter() | ||
| 178 | .flat_map(|tag| { | ||
| 179 | let tag_vec = tag.as_slice(); | ||
| 180 | if !tag_vec.is_empty() && tag_vec[0] == "relays" { | ||
| 181 | tag_vec[1..].iter().map(|s| s.to_string()).collect::<Vec<_>>() | ||
| 182 | } else { | ||
| 183 | vec![] | ||
| 184 | } | ||
| 185 | }) | ||
| 186 | .collect(); | ||
| 187 | |||
| 188 | let mut index = repo_sync_index.write().await; | ||
| 189 | index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds { | ||
| 190 | relays, | ||
| 191 | root_events: HashSet::new(), | ||
| 192 | sync_level: SyncLevel::StateOnly, | ||
| 193 | }); | ||
| 194 | drop(index); | ||
| 195 | |||
| 196 | tracing::debug!( | ||
| 197 | repo_id = %repo_id, | ||
| 198 | "Registered purgatory announcement in repo_sync_index as StateOnly" | ||
| 199 | ); | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 149 | WritePolicyResult::Reject { | 203 | WritePolicyResult::Reject { |
| 150 | status: true, // Client sees OK | 204 | status: true, // Client sees OK |
| 151 | message: "purgatory: won't be served until git data arrives".into(), | 205 | message: "purgatory: won't be served until git data arrives".into(), |
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs index 1566b6c..c958586 100644 --- a/src/nostr/policy/mod.rs +++ b/src/nostr/policy/mod.rs | |||
| @@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult; | |||
| 20 | 20 | ||
| 21 | use super::SharedDatabase; | 21 | use super::SharedDatabase; |
| 22 | use crate::purgatory::Purgatory; | 22 | use crate::purgatory::Purgatory; |
| 23 | use crate::sync::RepoSyncIndex; | ||
| 23 | use nostr_relay_builder::LocalRelay; | 24 | use nostr_relay_builder::LocalRelay; |
| 24 | use std::sync::Arc; | 25 | use std::sync::Arc; |
| 25 | 26 | ||
| @@ -34,6 +35,8 @@ pub struct PolicyContext { | |||
| 34 | pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, | 35 | pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, |
| 35 | /// Configuration reference for policy settings (includes blacklists) | 36 | /// Configuration reference for policy settings (includes blacklists) |
| 36 | pub config: crate::config::Config, | 37 | pub config: crate::config::Config, |
| 38 | /// Repo sync index for registering purgatory announcements (set after SyncManager creation) | ||
| 39 | pub repo_sync_index: Arc<std::sync::RwLock<Option<RepoSyncIndex>>>, | ||
| 37 | } | 40 | } |
| 38 | 41 | ||
| 39 | impl PolicyContext { | 42 | impl PolicyContext { |
| @@ -51,6 +54,7 @@ impl PolicyContext { | |||
| 51 | purgatory, | 54 | purgatory, |
| 52 | local_relay: Arc::new(std::sync::RwLock::new(None)), | 55 | local_relay: Arc::new(std::sync::RwLock::new(None)), |
| 53 | config, | 56 | config, |
| 57 | repo_sync_index: Arc::new(std::sync::RwLock::new(None)), | ||
| 54 | } | 58 | } |
| 55 | } | 59 | } |
| 56 | 60 | ||
| @@ -68,4 +72,19 @@ impl PolicyContext { | |||
| 68 | let guard = self.local_relay.read().unwrap(); | 72 | let guard = self.local_relay.read().unwrap(); |
| 69 | guard.clone() | 73 | guard.clone() |
| 70 | } | 74 | } |
| 75 | |||
| 76 | /// Set the repo sync index after SyncManager has been created. | ||
| 77 | /// | ||
| 78 | /// This allows purgatory announcements submitted by users to be registered | ||
| 79 | /// in the sync index so state event sync starts promptly. | ||
| 80 | pub fn set_repo_sync_index(&self, index: RepoSyncIndex) { | ||
| 81 | let mut guard = self.repo_sync_index.write().unwrap(); | ||
| 82 | *guard = Some(index); | ||
| 83 | } | ||
| 84 | |||
| 85 | /// Get a clone of the repo sync index if it has been set. | ||
| 86 | pub fn get_repo_sync_index(&self) -> Option<RepoSyncIndex> { | ||
| 87 | let guard = self.repo_sync_index.read().unwrap(); | ||
| 88 | guard.clone() | ||
| 89 | } | ||
| 71 | } | 90 | } |
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 519017b..916e2b0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs | |||
| @@ -700,6 +700,14 @@ impl SyncManager { | |||
| 700 | self.rejected_events_index.save_to_disk(path) | 700 | self.rejected_events_index.save_to_disk(path) |
| 701 | } | 701 | } |
| 702 | 702 | ||
| 703 | /// Get a clone of the repo sync index Arc. | ||
| 704 | /// | ||
| 705 | /// This allows the write policy to register user-submitted purgatory announcements | ||
| 706 | /// in the sync index so that state event sync starts promptly. | ||
| 707 | pub fn repo_sync_index(&self) -> RepoSyncIndex { | ||
| 708 | self.repo_sync_index.clone() | ||
| 709 | } | ||
| 710 | |||
| 703 | /// Handle EOSE (End Of Stored Events) for a subscription | 711 | /// Handle EOSE (End Of Stored Events) for a subscription |
| 704 | /// | 712 | /// |
| 705 | /// This method: | 713 | /// This method: |
| @@ -951,9 +959,29 @@ impl SyncManager { | |||
| 951 | 959 | ||
| 952 | // Create REQ+EOSE subscriptions using original semantic filters | 960 | // Create REQ+EOSE subscriptions using original semantic filters |
| 953 | // This queries by kind/author/tags instead of by ID, which may | 961 | // This queries by kind/author/tags instead of by ID, which may |
| 954 | // succeed even when ID-based queries fail | 962 | // succeed even when ID-based queries fail. |
| 955 | let fallback_filters = filters::build_layer2_and_layer3_filters( | 963 | // Split batch_repos by SyncLevel to avoid sending Layer 2 filters |
| 956 | &batch_repos, | 964 | // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be |
| 965 | // rejected as orphan and then silently dropped by nostr-sdk deduplication. | ||
| 966 | let (full_repos, state_only_repos) = { | ||
| 967 | let repo_index = self.repo_sync_index.read().await; | ||
| 968 | let mut full = HashSet::new(); | ||
| 969 | let mut state_only = HashSet::new(); | ||
| 970 | for repo_ref in &batch_repos { | ||
| 971 | match repo_index.get(repo_ref).map(|n| n.sync_level) { | ||
| 972 | Some(SyncLevel::StateOnly) => { | ||
| 973 | state_only.insert(repo_ref.clone()); | ||
| 974 | } | ||
| 975 | _ => { | ||
| 976 | full.insert(repo_ref.clone()); | ||
| 977 | } | ||
| 978 | } | ||
| 979 | } | ||
| 980 | (full, state_only) | ||
| 981 | }; | ||
| 982 | let fallback_filters = filters::build_sync_level_aware_filters( | ||
| 983 | &full_repos, | ||
| 984 | &state_only_repos, | ||
| 957 | &batch_root_events, | 985 | &batch_root_events, |
| 958 | None, | 986 | None, |
| 959 | ); | 987 | ); |
| @@ -1033,12 +1061,24 @@ impl SyncManager { | |||
| 1033 | { | 1061 | { |
| 1034 | let mut completed_batch = batches.remove(idx); | 1062 | let mut completed_batch = batches.remove(idx); |
| 1035 | completed_batch.failed = true; // Mark as failed | 1063 | completed_batch.failed = true; // Mark as failed |
| 1064 | let is_generic = | ||
| 1065 | completed_batch.items.repos.is_empty() | ||
| 1066 | && completed_batch.items.root_events.is_empty(); | ||
| 1036 | if batches.is_empty() { | 1067 | if batches.is_empty() { |
| 1037 | pending.remove(&relay_url_for_fallback); | 1068 | pending.remove(&relay_url_for_fallback); |
| 1038 | } | 1069 | } |
| 1039 | drop(pending); | 1070 | drop(pending); |
| 1040 | self.confirm_batch(&relay_url_for_fallback, completed_batch) | 1071 | self.confirm_batch(&relay_url_for_fallback, completed_batch) |
| 1041 | .await; | 1072 | .await; |
| 1073 | // For generic filter (announcement) batches, recompute filters | ||
| 1074 | // so any purgatory repos registered during this batch get | ||
| 1075 | // state-only subscriptions triggered. | ||
| 1076 | if is_generic { | ||
| 1077 | self.recompute_new_sync_filters_for_relay( | ||
| 1078 | &relay_url_for_fallback, | ||
| 1079 | ) | ||
| 1080 | .await; | ||
| 1081 | } | ||
| 1042 | } | 1082 | } |
| 1043 | } | 1083 | } |
| 1044 | return; | 1084 | return; |
| @@ -1132,12 +1172,24 @@ impl SyncManager { | |||
| 1132 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { | 1172 | if let Some(batches) = pending.get_mut(&relay_url_for_retry) { |
| 1133 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { | 1173 | if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { |
| 1134 | let completed_batch = batches.remove(idx); | 1174 | let completed_batch = batches.remove(idx); |
| 1175 | let is_generic = | ||
| 1176 | completed_batch.items.repos.is_empty() | ||
| 1177 | && completed_batch.items.root_events.is_empty(); | ||
| 1135 | if batches.is_empty() { | 1178 | if batches.is_empty() { |
| 1136 | pending.remove(&relay_url_for_retry); | 1179 | pending.remove(&relay_url_for_retry); |
| 1137 | } | 1180 | } |
| 1138 | drop(pending); | 1181 | drop(pending); |
| 1139 | self.confirm_batch(&relay_url_for_retry, completed_batch) | 1182 | self.confirm_batch(&relay_url_for_retry, completed_batch) |
| 1140 | .await; | 1183 | .await; |
| 1184 | // For generic filter (announcement) batches, recompute filters | ||
| 1185 | // so any purgatory repos registered during this batch get | ||
| 1186 | // state-only subscriptions triggered. | ||
| 1187 | if is_generic { | ||
| 1188 | self.recompute_new_sync_filters_for_relay( | ||
| 1189 | &relay_url_for_retry, | ||
| 1190 | ) | ||
| 1191 | .await; | ||
| 1192 | } | ||
| 1141 | } | 1193 | } |
| 1142 | } | 1194 | } |
| 1143 | return; | 1195 | return; |
| @@ -1148,6 +1200,8 @@ impl SyncManager { | |||
| 1148 | 1200 | ||
| 1149 | // 3. Batch complete - extract and remove | 1201 | // 3. Batch complete - extract and remove |
| 1150 | let completed_batch = batches.remove(batch_idx); | 1202 | let completed_batch = batches.remove(batch_idx); |
| 1203 | let is_generic = completed_batch.items.repos.is_empty() | ||
| 1204 | && completed_batch.items.root_events.is_empty(); | ||
| 1151 | 1205 | ||
| 1152 | // Clean up empty relay entry | 1206 | // Clean up empty relay entry |
| 1153 | if batches.is_empty() { | 1207 | if batches.is_empty() { |
| @@ -1159,6 +1213,12 @@ impl SyncManager { | |||
| 1159 | 1213 | ||
| 1160 | // 4. Confirm the batch (moves items to RelayState) | 1214 | // 4. Confirm the batch (moves items to RelayState) |
| 1161 | self.confirm_batch(relay_url, completed_batch).await; | 1215 | self.confirm_batch(relay_url, completed_batch).await; |
| 1216 | |||
| 1217 | // 5. For generic filter (announcement) batches, recompute sync filters so any | ||
| 1218 | // purgatory repos registered during this batch get state-only subscriptions triggered. | ||
| 1219 | if is_generic { | ||
| 1220 | self.recompute_new_sync_filters_for_relay(relay_url).await; | ||
| 1221 | } | ||
| 1162 | } | 1222 | } |
| 1163 | 1223 | ||
| 1164 | /// Confirm a completed batch by moving items to RelayState | 1224 | /// Confirm a completed batch by moving items to RelayState |
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs index db16c62..70c3dbf 100644 --- a/src/sync/self_subscriber.rs +++ b/src/sync/self_subscriber.rs | |||
| @@ -478,6 +478,10 @@ impl SelfSubscriber { | |||
| 478 | root_events: HashSet::new(), | 478 | root_events: HashSet::new(), |
| 479 | sync_level: SyncLevel::Full, | 479 | sync_level: SyncLevel::Full, |
| 480 | }); | 480 | }); |
| 481 | // Upgrade sync_level to Full - this handles the case where the entry | ||
| 482 | // already exists as StateOnly (purgatory announcement) and is now being | ||
| 483 | // promoted (git data arrived and the event was broadcast via notify_event). | ||
| 484 | entry.sync_level = SyncLevel::Full; | ||
| 481 | entry.relays.extend(needs.relays); | 485 | entry.relays.extend(needs.relays); |
| 482 | entry.root_events.extend(needs.root_events); | 486 | entry.root_events.extend(needs.root_events); |
| 483 | 487 | ||