upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs7
-rw-r--r--src/nostr/builder.rs54
-rw-r--r--src/nostr/policy/mod.rs19
-rw-r--r--src/sync/mod.rs66
-rw-r--r--src/sync/self_subscriber.rs4
5 files changed, 147 insertions, 3 deletions
diff --git a/src/main.rs b/src/main.rs
index ab6ede7..ebe05a3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -132,6 +132,13 @@ async fn main() -> Result<()> {
132 // Get a reference to the rejected events index for shutdown persistence 132 // Get a reference to the rejected events index for shutdown persistence
133 let shutdown_rejected_index = sync_manager.rejected_events_index(); 133 let shutdown_rejected_index = sync_manager.rejected_events_index();
134 134
135 // Wire repo_sync_index into write policy so user-submitted purgatory announcements
136 // get registered for state event sync immediately (Fix 3).
137 let repo_sync_index = sync_manager.repo_sync_index();
138 relay_with_db
139 .write_policy
140 .set_repo_sync_index(repo_sync_index);
141
135 tokio::spawn(async move { 142 tokio::spawn(async move {
136 sync_manager.run().await; 143 sync_manager.run().await;
137 }); 144 });
diff --git a/src/nostr/builder.rs b/src/nostr/builder.rs
index aff12a6..8d1e461 100644
--- a/src/nostr/builder.rs
+++ b/src/nostr/builder.rs
@@ -17,6 +17,7 @@ use crate::nostr::policy::{
17 AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult, 17 AnnouncementPolicy, AnnouncementResult, PolicyContext, PrEventPolicy, ReferenceResult,
18 RelatedEventPolicy, StatePolicy, StateResult, 18 RelatedEventPolicy, StatePolicy, StateResult,
19}; 19};
20use crate::sync::{RepoSyncIndex, RepoSyncNeeds, SyncLevel};
20 21
21/// Type alias for the shared database used by the relay 22/// Type alias for the shared database used by the relay
22pub type SharedDatabase = Arc<dyn NostrDatabase>; 23pub type SharedDatabase = Arc<dyn NostrDatabase>;
@@ -98,6 +99,14 @@ impl Nip34WritePolicy {
98 self.ctx.set_local_relay(relay); 99 self.ctx.set_local_relay(relay);
99 } 100 }
100 101
102 /// Set the repo sync index so that user-submitted purgatory announcements can
103 /// be registered for state event sync immediately.
104 ///
105 /// This must be called after SyncManager is created.
106 pub fn set_repo_sync_index(&self, index: RepoSyncIndex) {
107 self.ctx.set_repo_sync_index(index);
108 }
109
101 /// Handle repository announcement event 110 /// Handle repository announcement event
102 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult { 111 async fn handle_announcement(&self, event: &Event) -> WritePolicyResult {
103 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex()); 112 let event_id_str = event.id.to_bech32().unwrap_or_else(|_| event.id.to_hex());
@@ -146,6 +155,51 @@ impl Nip34WritePolicy {
146 "Accepted announcement to purgatory: {} (waiting for git data)", 155 "Accepted announcement to purgatory: {} (waiting for git data)",
147 event_id_str 156 event_id_str
148 ); 157 );
158
159 // Register repo in repo_sync_index with StateOnly level so that
160 // state event sync starts promptly via the next batch EOSE recompute.
161 // This handles user-submitted purgatory announcements - the SelfSubscriber
162 // only sees DB events, so it won't pick these up automatically.
163 if let Some(repo_sync_index) = self.ctx.get_repo_sync_index() {
164 if let Ok(announcement) =
165 RepositoryAnnouncement::from_event(event.clone())
166 {
167 use std::collections::HashSet;
168 let repo_id = format!(
169 "30617:{}:{}",
170 event.pubkey,
171 announcement.identifier
172 );
173
174 // Extract relay URLs from the announcement event tags
175 let relays: HashSet<String> = event
176 .tags
177 .iter()
178 .flat_map(|tag| {
179 let tag_vec = tag.as_slice();
180 if !tag_vec.is_empty() && tag_vec[0] == "relays" {
181 tag_vec[1..].iter().map(|s| s.to_string()).collect::<Vec<_>>()
182 } else {
183 vec![]
184 }
185 })
186 .collect();
187
188 let mut index = repo_sync_index.write().await;
189 index.entry(repo_id.clone()).or_insert_with(|| RepoSyncNeeds {
190 relays,
191 root_events: HashSet::new(),
192 sync_level: SyncLevel::StateOnly,
193 });
194 drop(index);
195
196 tracing::debug!(
197 repo_id = %repo_id,
198 "Registered purgatory announcement in repo_sync_index as StateOnly"
199 );
200 }
201 }
202
149 WritePolicyResult::Reject { 203 WritePolicyResult::Reject {
150 status: true, // Client sees OK 204 status: true, // Client sees OK
151 message: "purgatory: won't be served until git data arrives".into(), 205 message: "purgatory: won't be served until git data arrives".into(),
diff --git a/src/nostr/policy/mod.rs b/src/nostr/policy/mod.rs
index 1566b6c..c958586 100644
--- a/src/nostr/policy/mod.rs
+++ b/src/nostr/policy/mod.rs
@@ -20,6 +20,7 @@ pub use crate::git::sync::AlignmentResult;
20 20
21use super::SharedDatabase; 21use super::SharedDatabase;
22use crate::purgatory::Purgatory; 22use crate::purgatory::Purgatory;
23use crate::sync::RepoSyncIndex;
23use nostr_relay_builder::LocalRelay; 24use nostr_relay_builder::LocalRelay;
24use std::sync::Arc; 25use std::sync::Arc;
25 26
@@ -34,6 +35,8 @@ pub struct PolicyContext {
34 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>, 35 pub local_relay: Arc<std::sync::RwLock<Option<LocalRelay>>>,
35 /// Configuration reference for policy settings (includes blacklists) 36 /// Configuration reference for policy settings (includes blacklists)
36 pub config: crate::config::Config, 37 pub config: crate::config::Config,
38 /// Repo sync index for registering purgatory announcements (set after SyncManager creation)
39 pub repo_sync_index: Arc<std::sync::RwLock<Option<RepoSyncIndex>>>,
37} 40}
38 41
39impl PolicyContext { 42impl PolicyContext {
@@ -51,6 +54,7 @@ impl PolicyContext {
51 purgatory, 54 purgatory,
52 local_relay: Arc::new(std::sync::RwLock::new(None)), 55 local_relay: Arc::new(std::sync::RwLock::new(None)),
53 config, 56 config,
57 repo_sync_index: Arc::new(std::sync::RwLock::new(None)),
54 } 58 }
55 } 59 }
56 60
@@ -68,4 +72,19 @@ impl PolicyContext {
68 let guard = self.local_relay.read().unwrap(); 72 let guard = self.local_relay.read().unwrap();
69 guard.clone() 73 guard.clone()
70 } 74 }
75
76 /// Set the repo sync index after SyncManager has been created.
77 ///
78 /// This allows purgatory announcements submitted by users to be registered
79 /// in the sync index so state event sync starts promptly.
80 pub fn set_repo_sync_index(&self, index: RepoSyncIndex) {
81 let mut guard = self.repo_sync_index.write().unwrap();
82 *guard = Some(index);
83 }
84
85 /// Get a clone of the repo sync index if it has been set.
86 pub fn get_repo_sync_index(&self) -> Option<RepoSyncIndex> {
87 let guard = self.repo_sync_index.read().unwrap();
88 guard.clone()
89 }
71} 90}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 519017b..916e2b0 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -700,6 +700,14 @@ impl SyncManager {
700 self.rejected_events_index.save_to_disk(path) 700 self.rejected_events_index.save_to_disk(path)
701 } 701 }
702 702
703 /// Get a clone of the repo sync index Arc.
704 ///
705 /// This allows the write policy to register user-submitted purgatory announcements
706 /// in the sync index so that state event sync starts promptly.
707 pub fn repo_sync_index(&self) -> RepoSyncIndex {
708 self.repo_sync_index.clone()
709 }
710
703 /// Handle EOSE (End Of Stored Events) for a subscription 711 /// Handle EOSE (End Of Stored Events) for a subscription
704 /// 712 ///
705 /// This method: 713 /// This method:
@@ -951,9 +959,29 @@ impl SyncManager {
951 959
952 // Create REQ+EOSE subscriptions using original semantic filters 960 // Create REQ+EOSE subscriptions using original semantic filters
953 // This queries by kind/author/tags instead of by ID, which may 961 // This queries by kind/author/tags instead of by ID, which may
954 // succeed even when ID-based queries fail 962 // succeed even when ID-based queries fail.
955 let fallback_filters = filters::build_layer2_and_layer3_filters( 963 // Split batch_repos by SyncLevel to avoid sending Layer 2 filters
956 &batch_repos, 964 // (#a/#A/#q) for StateOnly (purgatory) repos - those PRs would be
965 // rejected as orphan and then silently dropped by nostr-sdk deduplication.
966 let (full_repos, state_only_repos) = {
967 let repo_index = self.repo_sync_index.read().await;
968 let mut full = HashSet::new();
969 let mut state_only = HashSet::new();
970 for repo_ref in &batch_repos {
971 match repo_index.get(repo_ref).map(|n| n.sync_level) {
972 Some(SyncLevel::StateOnly) => {
973 state_only.insert(repo_ref.clone());
974 }
975 _ => {
976 full.insert(repo_ref.clone());
977 }
978 }
979 }
980 (full, state_only)
981 };
982 let fallback_filters = filters::build_sync_level_aware_filters(
983 &full_repos,
984 &state_only_repos,
957 &batch_root_events, 985 &batch_root_events,
958 None, 986 None,
959 ); 987 );
@@ -1033,12 +1061,24 @@ impl SyncManager {
1033 { 1061 {
1034 let mut completed_batch = batches.remove(idx); 1062 let mut completed_batch = batches.remove(idx);
1035 completed_batch.failed = true; // Mark as failed 1063 completed_batch.failed = true; // Mark as failed
1064 let is_generic =
1065 completed_batch.items.repos.is_empty()
1066 && completed_batch.items.root_events.is_empty();
1036 if batches.is_empty() { 1067 if batches.is_empty() {
1037 pending.remove(&relay_url_for_fallback); 1068 pending.remove(&relay_url_for_fallback);
1038 } 1069 }
1039 drop(pending); 1070 drop(pending);
1040 self.confirm_batch(&relay_url_for_fallback, completed_batch) 1071 self.confirm_batch(&relay_url_for_fallback, completed_batch)
1041 .await; 1072 .await;
1073 // For generic filter (announcement) batches, recompute filters
1074 // so any purgatory repos registered during this batch get
1075 // state-only subscriptions triggered.
1076 if is_generic {
1077 self.recompute_new_sync_filters_for_relay(
1078 &relay_url_for_fallback,
1079 )
1080 .await;
1081 }
1042 } 1082 }
1043 } 1083 }
1044 return; 1084 return;
@@ -1132,12 +1172,24 @@ impl SyncManager {
1132 if let Some(batches) = pending.get_mut(&relay_url_for_retry) { 1172 if let Some(batches) = pending.get_mut(&relay_url_for_retry) {
1133 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) { 1173 if let Some(idx) = batches.iter().position(|b| b.batch_id == batch_id) {
1134 let completed_batch = batches.remove(idx); 1174 let completed_batch = batches.remove(idx);
1175 let is_generic =
1176 completed_batch.items.repos.is_empty()
1177 && completed_batch.items.root_events.is_empty();
1135 if batches.is_empty() { 1178 if batches.is_empty() {
1136 pending.remove(&relay_url_for_retry); 1179 pending.remove(&relay_url_for_retry);
1137 } 1180 }
1138 drop(pending); 1181 drop(pending);
1139 self.confirm_batch(&relay_url_for_retry, completed_batch) 1182 self.confirm_batch(&relay_url_for_retry, completed_batch)
1140 .await; 1183 .await;
1184 // For generic filter (announcement) batches, recompute filters
1185 // so any purgatory repos registered during this batch get
1186 // state-only subscriptions triggered.
1187 if is_generic {
1188 self.recompute_new_sync_filters_for_relay(
1189 &relay_url_for_retry,
1190 )
1191 .await;
1192 }
1141 } 1193 }
1142 } 1194 }
1143 return; 1195 return;
@@ -1148,6 +1200,8 @@ impl SyncManager {
1148 1200
1149 // 3. Batch complete - extract and remove 1201 // 3. Batch complete - extract and remove
1150 let completed_batch = batches.remove(batch_idx); 1202 let completed_batch = batches.remove(batch_idx);
1203 let is_generic = completed_batch.items.repos.is_empty()
1204 && completed_batch.items.root_events.is_empty();
1151 1205
1152 // Clean up empty relay entry 1206 // Clean up empty relay entry
1153 if batches.is_empty() { 1207 if batches.is_empty() {
@@ -1159,6 +1213,12 @@ impl SyncManager {
1159 1213
1160 // 4. Confirm the batch (moves items to RelayState) 1214 // 4. Confirm the batch (moves items to RelayState)
1161 self.confirm_batch(relay_url, completed_batch).await; 1215 self.confirm_batch(relay_url, completed_batch).await;
1216
1217 // 5. For generic filter (announcement) batches, recompute sync filters so any
1218 // purgatory repos registered during this batch get state-only subscriptions triggered.
1219 if is_generic {
1220 self.recompute_new_sync_filters_for_relay(relay_url).await;
1221 }
1162 } 1222 }
1163 1223
1164 /// Confirm a completed batch by moving items to RelayState 1224 /// Confirm a completed batch by moving items to RelayState
diff --git a/src/sync/self_subscriber.rs b/src/sync/self_subscriber.rs
index db16c62..70c3dbf 100644
--- a/src/sync/self_subscriber.rs
+++ b/src/sync/self_subscriber.rs
@@ -478,6 +478,10 @@ impl SelfSubscriber {
478 root_events: HashSet::new(), 478 root_events: HashSet::new(),
479 sync_level: SyncLevel::Full, 479 sync_level: SyncLevel::Full,
480 }); 480 });
481 // Upgrade sync_level to Full - this handles the case where the entry
482 // already exists as StateOnly (purgatory announcement) and is now being
483 // promoted (git data arrived and the event was broadcast via notify_event).
484 entry.sync_level = SyncLevel::Full;
481 entry.relays.extend(needs.relays); 485 entry.relays.extend(needs.relays);
482 entry.root_events.extend(needs.root_events); 486 entry.root_events.extend(needs.root_events);
483 487