1use crate::node::impersonate::ImpersonationManager;
2use anvil_zksync_types::{TransactionOrder, TransactionPriority};
3use futures::channel::mpsc::{channel, Receiver, Sender};
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
7use zksync_types::{Transaction, H256};
8
9#[derive(Debug, Clone)]
10pub struct TxPool {
11 inner: Arc<RwLock<BTreeSet<PoolTransaction>>>,
12 transaction_order: Arc<RwLock<TransactionOrder>>,
14 submission_number: Arc<Mutex<u64>>,
16 tx_listeners: Arc<Mutex<Vec<Sender<H256>>>>,
18 pub(crate) impersonation: ImpersonationManager,
19}
20
21impl TxPool {
22 pub fn new(impersonation: ImpersonationManager, transaction_order: TransactionOrder) -> Self {
23 Self {
24 inner: Arc::new(RwLock::new(BTreeSet::new())),
25 submission_number: Arc::new(Mutex::new(0)),
26 tx_listeners: Arc::new(Mutex::new(Vec::new())),
27 impersonation,
28 transaction_order: Arc::new(RwLock::new(transaction_order)),
29 }
30 }
31
32 fn lock_submission_number(&self) -> MutexGuard<'_, u64> {
33 self.submission_number
34 .lock()
35 .expect("submission_number lock is poisoned")
36 }
37
38 fn read_transaction_order(&self) -> RwLockReadGuard<'_, TransactionOrder> {
39 self.transaction_order
40 .read()
41 .expect("transaction_order lock is poisoned")
42 }
43
44 pub fn add_tx(&self, tx: Transaction) {
45 let hash = tx.hash();
46 let priority = self.read_transaction_order().priority(&tx);
47 let mut submission_number = self.lock_submission_number();
48 *submission_number = submission_number.wrapping_add(1);
49
50 let mut guard = self.inner.write().expect("TxPool lock is poisoned");
51 guard.insert(PoolTransaction {
52 transaction: tx,
53 submission_number: *submission_number,
54 priority,
55 });
56 self.notify_listeners(hash);
57 }
58
59 pub fn add_txs(&self, txs: Vec<Transaction>) {
60 let transaction_order = self.read_transaction_order();
61 let mut submission_number = self.lock_submission_number();
62
63 let mut guard = self.inner.write().expect("TxPool lock is poisoned");
64 for tx in txs {
65 let hash = tx.hash();
66 let priority = transaction_order.priority(&tx);
67 *submission_number = submission_number.wrapping_add(1);
68 guard.insert(PoolTransaction {
69 transaction: tx,
70 submission_number: *submission_number,
71 priority,
72 });
73 self.notify_listeners(hash);
74 }
75 }
76
77 pub fn drop_transaction(&self, hash: H256) -> Option<Transaction> {
79 let dropped = self.drop_transactions(|tx| tx.transaction.hash() == hash);
80 dropped.first().cloned()
81 }
82
83 pub fn drop_transactions<F>(&self, f: F) -> Vec<Transaction>
85 where
86 F: Fn(&PoolTransaction) -> bool,
87 {
88 let mut guard = self.inner.write().expect("TxPool lock is poisoned");
89 let txs = std::mem::take(&mut *guard);
90 let (matching_txs, other_txs) = txs.into_iter().partition(f);
91 *guard = other_txs;
92 matching_txs.into_iter().map(|tx| tx.transaction).collect()
93 }
94
95 pub fn clear(&self) {
97 let mut guard = self.inner.write().expect("TxPool lock is poisoned");
98 guard.clear();
99 }
100
101 pub fn take_uniform(&self, n: usize) -> Option<TxBatch> {
105 if n == 0 {
106 return None;
107 }
108 let mut guard = self.inner.write().expect("TxPool lock is poisoned");
109 let Some(head_tx) = guard.pop_last() else {
110 return None;
112 };
113 let mut taken_txs = vec![];
114 let impersonating = self.impersonation.inspect(|state| {
115 let impersonating = state.is_impersonating(&head_tx.transaction.initiator_account());
118 taken_txs.insert(0, head_tx.transaction);
119 let mut taken_txs_number = 1;
120
121 while taken_txs_number < n {
122 let Some(next_tx) = guard.last() else {
123 break;
124 };
125 if impersonating != state.is_impersonating(&next_tx.transaction.initiator_account())
126 {
127 break;
128 }
129 taken_txs.insert(taken_txs_number, guard.pop_last().unwrap().transaction);
130 taken_txs_number += 1;
131 }
132 impersonating
133 });
134
135 Some(TxBatch {
136 impersonating,
137 txs: taken_txs,
138 })
139 }
140
141 pub fn add_tx_listener(&self) -> Receiver<H256> {
143 const TX_LISTENER_BUFFER_SIZE: usize = 2048;
144 let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
145 self.tx_listeners
146 .lock()
147 .expect("TxPool lock is poisoned")
148 .push(tx);
149 rx
150 }
151
152 fn notify_listeners(&self, tx_hash: H256) {
154 let mut tx_listeners = self.tx_listeners.lock().expect("TxPool lock is poisoned");
155 tx_listeners.retain_mut(|listener| match listener.try_send(tx_hash) {
156 Ok(()) => true,
157 Err(e) => {
158 if e.is_full() {
159 tracing::warn!(
160 %tx_hash,
161 "Failed to send transaction notification because channel is full",
162 );
163 true
164 } else {
165 false
166 }
167 }
168 });
169 }
170}
171
172#[cfg(test)]
174impl TxPool {
175 pub fn populate<const N: usize>(&self) -> [Transaction; N] {
177 let to_impersonate = [false; N];
178 self.populate_impersonate(to_impersonate)
179 }
180
181 pub fn populate_impersonate<const N: usize>(
184 &self,
185 to_impersonate: [bool; N],
186 ) -> [Transaction; N] {
187 to_impersonate.map(|to_impersonate| {
188 let tx: Transaction = crate::testing::TransactionBuilder::new().build().into();
189
190 if to_impersonate {
191 assert!(self.impersonation.impersonate(tx.initiator_account()));
192 }
193
194 self.add_tx(tx.clone());
195 tx
196 })
197 }
198}
199
200#[derive(PartialEq, Debug)]
213pub struct TxBatch {
214 pub impersonating: bool,
215 pub txs: Vec<Transaction>,
216}
217
218#[derive(Clone, Debug)]
220pub struct PoolTransaction {
221 pub transaction: Transaction,
223 pub submission_number: u64,
225 pub priority: TransactionPriority,
227}
228
229impl Eq for PoolTransaction {}
230
231impl PartialEq<Self> for PoolTransaction {
232 fn eq(&self, other: &Self) -> bool {
233 self.cmp(other) == Ordering::Equal
234 }
235}
236
237impl PartialOrd<Self> for PoolTransaction {
238 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
239 Some(self.cmp(other))
240 }
241}
242
243impl Ord for PoolTransaction {
244 fn cmp(&self, other: &Self) -> Ordering {
245 self.priority
246 .cmp(&other.priority)
247 .then_with(|| other.submission_number.cmp(&self.submission_number))
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use crate::node::impersonate::ImpersonationState;
254 use crate::node::pool::TxBatch;
255 use crate::node::{ImpersonationManager, TxPool};
256 use crate::testing;
257 use anvil_zksync_types::TransactionOrder;
258 use test_case::test_case;
259 use zksync_types::{Transaction, U256};
260
261 #[test]
262 fn take_from_empty() {
263 let impersonation = ImpersonationManager::default();
264 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
265 assert_eq!(pool.take_uniform(1), None);
266 }
267
268 #[test_case(false ; "not impersonated")]
269 #[test_case(true ; "is impersonated")]
270 fn take_zero(imp: bool) {
271 let impersonation = ImpersonationManager::default();
272 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
273
274 pool.populate_impersonate([imp]);
275 assert_eq!(pool.take_uniform(0), None);
276 }
277
278 #[test_case(false ; "not impersonated")]
279 #[test_case(true ; "is impersonated")]
280 fn take_exactly_one(imp: bool) {
281 let impersonation = ImpersonationManager::default();
282 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
283
284 let [tx0, ..] = pool.populate_impersonate([imp, false]);
285 assert_eq!(
286 pool.take_uniform(1),
287 Some(TxBatch {
288 impersonating: imp,
289 txs: vec![tx0]
290 })
291 );
292 }
293
294 #[test_case(false ; "not impersonated")]
295 #[test_case(true ; "is impersonated")]
296 fn take_exactly_two(imp: bool) {
297 let impersonation = ImpersonationManager::default();
298 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
299
300 let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, false]);
301 assert_eq!(
302 pool.take_uniform(2),
303 Some(TxBatch {
304 impersonating: imp,
305 txs: vec![tx0, tx1]
306 })
307 );
308 }
309
310 #[test_case(false ; "not impersonated")]
311 #[test_case(true ; "is impersonated")]
312 fn take_one_eligible(imp: bool) {
313 let impersonation = ImpersonationManager::default();
314 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
315
316 let [tx0, ..] = pool.populate_impersonate([imp, !imp, !imp, !imp]);
317 assert_eq!(
318 pool.take_uniform(4),
319 Some(TxBatch {
320 impersonating: imp,
321 txs: vec![tx0]
322 })
323 );
324 }
325
326 #[test_case(false ; "not impersonated")]
329 #[test_case(true ; "is impersonated")]
330 fn take_two_when_third_is_not_uniform(imp: bool) {
331 let impersonation = ImpersonationManager::default();
332 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
333
334 let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp]);
335 assert_eq!(
336 pool.take_uniform(3),
337 Some(TxBatch {
338 impersonating: imp,
339 txs: vec![tx0, tx1]
340 })
341 );
342 }
343
344 #[test_case(false ; "not impersonated")]
347 #[test_case(true ; "is impersonated")]
348 fn take_interrupted_by_non_uniformness(imp: bool) {
349 let impersonation = ImpersonationManager::default();
350 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
351
352 let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp, imp]);
353 assert_eq!(
354 pool.take_uniform(4),
355 Some(TxBatch {
356 impersonating: imp,
357 txs: vec![tx0, tx1]
358 })
359 );
360 }
361
362 #[test_case(false ; "not impersonated")]
363 #[test_case(true ; "is impersonated")]
364 fn take_multiple(imp: bool) {
365 let impersonation = ImpersonationManager::default();
366 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
367
368 let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, !imp, !imp, imp]);
369 assert_eq!(
370 pool.take_uniform(100),
371 Some(TxBatch {
372 impersonating: imp,
373 txs: vec![tx0]
374 })
375 );
376 assert_eq!(
377 pool.take_uniform(100),
378 Some(TxBatch {
379 impersonating: !imp,
380 txs: vec![tx1, tx2]
381 })
382 );
383 assert_eq!(
384 pool.take_uniform(100),
385 Some(TxBatch {
386 impersonating: imp,
387 txs: vec![tx3]
388 })
389 );
390 }
391
392 #[test_case(false ; "not impersonated")]
393 #[test_case(true ; "is impersonated")]
394 fn pool_clones_share_state(imp: bool) {
395 let impersonation = ImpersonationManager::default();
396 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
397
398 let txs = {
399 let pool_clone = pool.clone();
400 pool_clone.populate_impersonate([imp, imp, imp])
401 };
402 assert_eq!(
403 pool.take_uniform(3),
404 Some(TxBatch {
405 impersonating: imp,
406 txs: txs.to_vec()
407 })
408 );
409 }
410
411 #[test_case(false ; "not impersonated")]
412 #[test_case(true ; "is impersonated")]
413 fn take_multiple_from_clones(imp: bool) {
414 let impersonation = ImpersonationManager::default();
415 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
416
417 let [tx0, tx1, tx2, tx3] = {
418 let pool_clone = pool.clone();
419 pool_clone.populate_impersonate([imp, !imp, !imp, imp])
420 };
421 let pool0 = pool.clone();
422 assert_eq!(
423 pool0.take_uniform(100),
424 Some(TxBatch {
425 impersonating: imp,
426 txs: vec![tx0]
427 })
428 );
429 let pool1 = pool.clone();
430 assert_eq!(
431 pool1.take_uniform(100),
432 Some(TxBatch {
433 impersonating: !imp,
434 txs: vec![tx1, tx2]
435 })
436 );
437 let pool2 = pool.clone();
438 assert_eq!(
439 pool2.take_uniform(100),
440 Some(TxBatch {
441 impersonating: imp,
442 txs: vec![tx3]
443 })
444 );
445 }
446
447 #[test_case(false ; "not impersonated")]
448 #[test_case(true ; "is impersonated")]
449 fn take_respects_impersonation_change(imp: bool) {
450 let impersonation = ImpersonationManager::default();
451 let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
452
453 let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, imp, !imp, imp]);
454 assert_eq!(
455 pool.take_uniform(4),
456 Some(TxBatch {
457 impersonating: imp,
458 txs: vec![tx0, tx1]
459 })
460 );
461
462 if !imp {
464 pool.impersonation
465 .stop_impersonating(&tx2.initiator_account());
466 } else {
467 pool.impersonation.impersonate(tx2.initiator_account());
468 }
469
470 assert_eq!(
471 pool.take_uniform(4),
472 Some(TxBatch {
473 impersonating: imp,
474 txs: vec![tx2, tx3]
475 })
476 );
477 }
478
479 #[tokio::test]
480 async fn take_uses_consistent_impersonation() {
481 let impersonation = ImpersonationManager::default();
482 let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo);
483
484 for _ in 0..4096 {
485 let tx: Transaction = testing::TransactionBuilder::new().build().into();
486
487 assert!(pool.impersonation.impersonate(tx.initiator_account()));
488
489 pool.add_tx(tx.clone());
490 }
491
492 let take_handle = tokio::spawn(async move { pool.take_uniform(4096) });
493 let clear_impersonation_handle =
494 tokio::spawn(async move { impersonation.set_state(ImpersonationState::default()) });
495
496 clear_impersonation_handle.await.unwrap();
497 let tx_batch = take_handle
498 .await
499 .unwrap()
500 .expect("failed to take a tx batch");
501 assert_eq!(tx_batch.txs.len(), 4096);
506 }
507
508 #[tokio::test]
509 async fn take_uses_transaction_order() {
510 let impersonation = ImpersonationManager::default();
511 let pool_fifo = TxPool::new(impersonation.clone(), TransactionOrder::Fifo);
512 let pool_fees = TxPool::new(impersonation.clone(), TransactionOrder::Fees);
513
514 let txs: Vec<Transaction> = [1, 2, 3]
515 .iter()
516 .map(|index| {
517 let tx: Transaction = testing::TransactionBuilder::new()
518 .set_max_fee_per_gas(U256::from(50_000_000 + index))
519 .build()
520 .into();
521 pool_fifo.add_tx(tx.clone());
522 pool_fees.add_tx(tx.clone());
523 tx
524 })
525 .collect();
526
527 assert_eq!(
528 pool_fifo.take_uniform(3),
529 Some(TxBatch {
530 impersonating: false,
531 txs: vec![txs[0].clone(), txs[1].clone(), txs[2].clone()]
532 })
533 );
534
535 assert_eq!(
536 pool_fees.take_uniform(3),
537 Some(TxBatch {
538 impersonating: false,
539 txs: vec![txs[2].clone(), txs[1].clone(), txs[0].clone()]
540 })
541 );
542 }
543}