anvil_zksync_core/node/
pool.rs

1use crate::node::impersonate::ImpersonationManager;
2use anvil_zksync_types::{TransactionOrder, TransactionPriority};
3use futures::channel::mpsc::{channel, Receiver, Sender};
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
7use zksync_types::{Transaction, H256};
8
9#[derive(Debug, Clone)]
10pub struct TxPool {
11    inner: Arc<RwLock<BTreeSet<PoolTransaction>>>,
12    /// Transaction ordering in the mempool.
13    transaction_order: Arc<RwLock<TransactionOrder>>,
14    /// Used to preserve transactions submission order in the pool
15    submission_number: Arc<Mutex<u64>>,
16    /// Listeners for new transactions' hashes
17    tx_listeners: Arc<Mutex<Vec<Sender<H256>>>>,
18    pub(crate) impersonation: ImpersonationManager,
19}
20
21impl TxPool {
22    pub fn new(impersonation: ImpersonationManager, transaction_order: TransactionOrder) -> Self {
23        Self {
24            inner: Arc::new(RwLock::new(BTreeSet::new())),
25            submission_number: Arc::new(Mutex::new(0)),
26            tx_listeners: Arc::new(Mutex::new(Vec::new())),
27            impersonation,
28            transaction_order: Arc::new(RwLock::new(transaction_order)),
29        }
30    }
31
32    fn lock_submission_number(&self) -> MutexGuard<'_, u64> {
33        self.submission_number
34            .lock()
35            .expect("submission_number lock is poisoned")
36    }
37
38    fn read_transaction_order(&self) -> RwLockReadGuard<'_, TransactionOrder> {
39        self.transaction_order
40            .read()
41            .expect("transaction_order lock is poisoned")
42    }
43
44    pub fn add_tx(&self, tx: Transaction) {
45        let hash = tx.hash();
46        let priority = self.read_transaction_order().priority(&tx);
47        let mut submission_number = self.lock_submission_number();
48        *submission_number = submission_number.wrapping_add(1);
49
50        let mut guard = self.inner.write().expect("TxPool lock is poisoned");
51        guard.insert(PoolTransaction {
52            transaction: tx,
53            submission_number: *submission_number,
54            priority,
55        });
56        self.notify_listeners(hash);
57    }
58
59    pub fn add_txs(&self, txs: Vec<Transaction>) {
60        let transaction_order = self.read_transaction_order();
61        let mut submission_number = self.lock_submission_number();
62
63        let mut guard = self.inner.write().expect("TxPool lock is poisoned");
64        for tx in txs {
65            let hash = tx.hash();
66            let priority = transaction_order.priority(&tx);
67            *submission_number = submission_number.wrapping_add(1);
68            guard.insert(PoolTransaction {
69                transaction: tx,
70                submission_number: *submission_number,
71                priority,
72            });
73            self.notify_listeners(hash);
74        }
75    }
76
77    /// Removes a single transaction from the pool
78    pub fn drop_transaction(&self, hash: H256) -> Option<Transaction> {
79        let dropped = self.drop_transactions(|tx| tx.transaction.hash() == hash);
80        dropped.first().cloned()
81    }
82
83    /// Remove transactions matching the specified condition
84    pub fn drop_transactions<F>(&self, f: F) -> Vec<Transaction>
85    where
86        F: Fn(&PoolTransaction) -> bool,
87    {
88        let mut guard = self.inner.write().expect("TxPool lock is poisoned");
89        let txs = std::mem::take(&mut *guard);
90        let (matching_txs, other_txs) = txs.into_iter().partition(f);
91        *guard = other_txs;
92        matching_txs.into_iter().map(|tx| tx.transaction).collect()
93    }
94
95    /// Removes all transactions from the pool
96    pub fn clear(&self) {
97        let mut guard = self.inner.write().expect("TxPool lock is poisoned");
98        guard.clear();
99    }
100
101    /// Take up to `n` continuous transactions from the pool that are all uniform in impersonation
102    /// type (either all are impersonating or all non-impersonating).
103    // TODO: We should distinguish ready transactions from non-ready ones. Only ready txs should be takeable.
104    pub fn take_uniform(&self, n: usize) -> Option<TxBatch> {
105        if n == 0 {
106            return None;
107        }
108        let mut guard = self.inner.write().expect("TxPool lock is poisoned");
109        let Some(head_tx) = guard.pop_last() else {
110            // Pool is empty
111            return None;
112        };
113        let mut taken_txs = vec![];
114        let impersonating = self.impersonation.inspect(|state| {
115            // First tx's impersonation status decides what all other txs' impersonation status is
116            // expected to be.
117            let impersonating = state.is_impersonating(&head_tx.transaction.initiator_account());
118            taken_txs.insert(0, head_tx.transaction);
119            let mut taken_txs_number = 1;
120
121            while taken_txs_number < n {
122                let Some(next_tx) = guard.last() else {
123                    break;
124                };
125                if impersonating != state.is_impersonating(&next_tx.transaction.initiator_account())
126                {
127                    break;
128                }
129                taken_txs.insert(taken_txs_number, guard.pop_last().unwrap().transaction);
130                taken_txs_number += 1;
131            }
132            impersonating
133        });
134
135        Some(TxBatch {
136            impersonating,
137            txs: taken_txs,
138        })
139    }
140
141    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
142    pub fn add_tx_listener(&self) -> Receiver<H256> {
143        const TX_LISTENER_BUFFER_SIZE: usize = 2048;
144        let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
145        self.tx_listeners
146            .lock()
147            .expect("TxPool lock is poisoned")
148            .push(tx);
149        rx
150    }
151
152    /// Notifies all listeners about the transaction.
153    fn notify_listeners(&self, tx_hash: H256) {
154        let mut tx_listeners = self.tx_listeners.lock().expect("TxPool lock is poisoned");
155        tx_listeners.retain_mut(|listener| match listener.try_send(tx_hash) {
156            Ok(()) => true,
157            Err(e) => {
158                if e.is_full() {
159                    tracing::warn!(
160                        %tx_hash,
161                        "Failed to send transaction notification because channel is full",
162                    );
163                    true
164                } else {
165                    false
166                }
167            }
168        });
169    }
170}
171
172// Test utilities
173#[cfg(test)]
174impl TxPool {
175    /// Populates pool with `N` randomly generated transactions without impersonation.
176    pub fn populate<const N: usize>(&self) -> [Transaction; N] {
177        let to_impersonate = [false; N];
178        self.populate_impersonate(to_impersonate)
179    }
180
181    /// Populates pool with `N` randomly generated transactions where `i`-th transaction is using an
182    /// impersonated account if `to_impersonate[i]` is `true`.
183    pub fn populate_impersonate<const N: usize>(
184        &self,
185        to_impersonate: [bool; N],
186    ) -> [Transaction; N] {
187        to_impersonate.map(|to_impersonate| {
188            let tx: Transaction = crate::testing::TransactionBuilder::new().build().into();
189
190            if to_impersonate {
191                assert!(self.impersonation.impersonate(tx.initiator_account()));
192            }
193
194            self.add_tx(tx.clone());
195            tx
196        })
197    }
198}
199
200/// A batch of transactions meant to be sealed as a block. All transactions in the batch share the
201/// same impersonation status on the moment of the batch's creation.
202///
203/// A block produced from this batch is guaranteed to:
204/// * Not contain any transactions outside of this transaction batch
205/// * Use contracts matching `impersonating` mode of this transaction batch.
206///
207/// Potential caveats:
208/// * The impersonation status of transactions' initiators (as defined by [`ImpersonationManager`])
209///   is not guaranteed to be the same by the time the batch gets executed
210/// * The resulting block is not guaranteed to contain all transactions as some of them could be
211///   non-executable.
212#[derive(PartialEq, Debug)]
213pub struct TxBatch {
214    pub impersonating: bool,
215    pub txs: Vec<Transaction>,
216}
217
218/// A reference to a transaction in the pool
219#[derive(Clone, Debug)]
220pub struct PoolTransaction {
221    /// actual transaction
222    pub transaction: Transaction,
223    /// Used to internally compare the transaction in the pool
224    pub submission_number: u64,
225    /// priority of the transaction
226    pub priority: TransactionPriority,
227}
228
229impl Eq for PoolTransaction {}
230
231impl PartialEq<Self> for PoolTransaction {
232    fn eq(&self, other: &Self) -> bool {
233        self.cmp(other) == Ordering::Equal
234    }
235}
236
237impl PartialOrd<Self> for PoolTransaction {
238    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
239        Some(self.cmp(other))
240    }
241}
242
243impl Ord for PoolTransaction {
244    fn cmp(&self, other: &Self) -> Ordering {
245        self.priority
246            .cmp(&other.priority)
247            .then_with(|| other.submission_number.cmp(&self.submission_number))
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use crate::node::impersonate::ImpersonationState;
254    use crate::node::pool::TxBatch;
255    use crate::node::{ImpersonationManager, TxPool};
256    use crate::testing;
257    use anvil_zksync_types::TransactionOrder;
258    use test_case::test_case;
259    use zksync_types::{Transaction, U256};
260
261    #[test]
262    fn take_from_empty() {
263        let impersonation = ImpersonationManager::default();
264        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
265        assert_eq!(pool.take_uniform(1), None);
266    }
267
268    #[test_case(false ; "not impersonated")]
269    #[test_case(true  ; "is impersonated")]
270    fn take_zero(imp: bool) {
271        let impersonation = ImpersonationManager::default();
272        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
273
274        pool.populate_impersonate([imp]);
275        assert_eq!(pool.take_uniform(0), None);
276    }
277
278    #[test_case(false ; "not impersonated")]
279    #[test_case(true  ; "is impersonated")]
280    fn take_exactly_one(imp: bool) {
281        let impersonation = ImpersonationManager::default();
282        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
283
284        let [tx0, ..] = pool.populate_impersonate([imp, false]);
285        assert_eq!(
286            pool.take_uniform(1),
287            Some(TxBatch {
288                impersonating: imp,
289                txs: vec![tx0]
290            })
291        );
292    }
293
294    #[test_case(false ; "not impersonated")]
295    #[test_case(true  ; "is impersonated")]
296    fn take_exactly_two(imp: bool) {
297        let impersonation = ImpersonationManager::default();
298        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
299
300        let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, false]);
301        assert_eq!(
302            pool.take_uniform(2),
303            Some(TxBatch {
304                impersonating: imp,
305                txs: vec![tx0, tx1]
306            })
307        );
308    }
309
310    #[test_case(false ; "not impersonated")]
311    #[test_case(true  ; "is impersonated")]
312    fn take_one_eligible(imp: bool) {
313        let impersonation = ImpersonationManager::default();
314        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
315
316        let [tx0, ..] = pool.populate_impersonate([imp, !imp, !imp, !imp]);
317        assert_eq!(
318            pool.take_uniform(4),
319            Some(TxBatch {
320                impersonating: imp,
321                txs: vec![tx0]
322            })
323        );
324    }
325
326    // 3 transactions in total: 1 and 2 share impersonation status, 3 does not.
327    // `TxPool` should only take [1, 2] when 3 txs are requested.
328    #[test_case(false ; "not impersonated")]
329    #[test_case(true  ; "is impersonated")]
330    fn take_two_when_third_is_not_uniform(imp: bool) {
331        let impersonation = ImpersonationManager::default();
332        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
333
334        let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp]);
335        assert_eq!(
336            pool.take_uniform(3),
337            Some(TxBatch {
338                impersonating: imp,
339                txs: vec![tx0, tx1]
340            })
341        );
342    }
343
344    // 4 transactions in total: 1, 2 and 4 share impersonation status, 3 does not.
345    // `TxPool` should only take [1, 2] when 4 txs are requested.
346    #[test_case(false ; "not impersonated")]
347    #[test_case(true  ; "is impersonated")]
348    fn take_interrupted_by_non_uniformness(imp: bool) {
349        let impersonation = ImpersonationManager::default();
350        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
351
352        let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp, imp]);
353        assert_eq!(
354            pool.take_uniform(4),
355            Some(TxBatch {
356                impersonating: imp,
357                txs: vec![tx0, tx1]
358            })
359        );
360    }
361
362    #[test_case(false ; "not impersonated")]
363    #[test_case(true  ; "is impersonated")]
364    fn take_multiple(imp: bool) {
365        let impersonation = ImpersonationManager::default();
366        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
367
368        let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, !imp, !imp, imp]);
369        assert_eq!(
370            pool.take_uniform(100),
371            Some(TxBatch {
372                impersonating: imp,
373                txs: vec![tx0]
374            })
375        );
376        assert_eq!(
377            pool.take_uniform(100),
378            Some(TxBatch {
379                impersonating: !imp,
380                txs: vec![tx1, tx2]
381            })
382        );
383        assert_eq!(
384            pool.take_uniform(100),
385            Some(TxBatch {
386                impersonating: imp,
387                txs: vec![tx3]
388            })
389        );
390    }
391
392    #[test_case(false ; "not impersonated")]
393    #[test_case(true  ; "is impersonated")]
394    fn pool_clones_share_state(imp: bool) {
395        let impersonation = ImpersonationManager::default();
396        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
397
398        let txs = {
399            let pool_clone = pool.clone();
400            pool_clone.populate_impersonate([imp, imp, imp])
401        };
402        assert_eq!(
403            pool.take_uniform(3),
404            Some(TxBatch {
405                impersonating: imp,
406                txs: txs.to_vec()
407            })
408        );
409    }
410
411    #[test_case(false ; "not impersonated")]
412    #[test_case(true  ; "is impersonated")]
413    fn take_multiple_from_clones(imp: bool) {
414        let impersonation = ImpersonationManager::default();
415        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
416
417        let [tx0, tx1, tx2, tx3] = {
418            let pool_clone = pool.clone();
419            pool_clone.populate_impersonate([imp, !imp, !imp, imp])
420        };
421        let pool0 = pool.clone();
422        assert_eq!(
423            pool0.take_uniform(100),
424            Some(TxBatch {
425                impersonating: imp,
426                txs: vec![tx0]
427            })
428        );
429        let pool1 = pool.clone();
430        assert_eq!(
431            pool1.take_uniform(100),
432            Some(TxBatch {
433                impersonating: !imp,
434                txs: vec![tx1, tx2]
435            })
436        );
437        let pool2 = pool.clone();
438        assert_eq!(
439            pool2.take_uniform(100),
440            Some(TxBatch {
441                impersonating: imp,
442                txs: vec![tx3]
443            })
444        );
445    }
446
447    #[test_case(false ; "not impersonated")]
448    #[test_case(true  ; "is impersonated")]
449    fn take_respects_impersonation_change(imp: bool) {
450        let impersonation = ImpersonationManager::default();
451        let pool = TxPool::new(impersonation, TransactionOrder::Fifo);
452
453        let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, imp, !imp, imp]);
454        assert_eq!(
455            pool.take_uniform(4),
456            Some(TxBatch {
457                impersonating: imp,
458                txs: vec![tx0, tx1]
459            })
460        );
461
462        // Change tx2's impersonation status to opposite
463        if !imp {
464            pool.impersonation
465                .stop_impersonating(&tx2.initiator_account());
466        } else {
467            pool.impersonation.impersonate(tx2.initiator_account());
468        }
469
470        assert_eq!(
471            pool.take_uniform(4),
472            Some(TxBatch {
473                impersonating: imp,
474                txs: vec![tx2, tx3]
475            })
476        );
477    }
478
479    #[tokio::test]
480    async fn take_uses_consistent_impersonation() {
481        let impersonation = ImpersonationManager::default();
482        let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo);
483
484        for _ in 0..4096 {
485            let tx: Transaction = testing::TransactionBuilder::new().build().into();
486
487            assert!(pool.impersonation.impersonate(tx.initiator_account()));
488
489            pool.add_tx(tx.clone());
490        }
491
492        let take_handle = tokio::spawn(async move { pool.take_uniform(4096) });
493        let clear_impersonation_handle =
494            tokio::spawn(async move { impersonation.set_state(ImpersonationState::default()) });
495
496        clear_impersonation_handle.await.unwrap();
497        let tx_batch = take_handle
498            .await
499            .unwrap()
500            .expect("failed to take a tx batch");
501        // Note that we do not assert impersonation status as both `true` and `false` are valid
502        // results here depending on the race between the two tasks above. But the returned
503        // transactions should always be a complete set - in other words, `TxPool` should not see
504        // a change in impersonation state partway through iterating the transactions.
505        assert_eq!(tx_batch.txs.len(), 4096);
506    }
507
508    #[tokio::test]
509    async fn take_uses_transaction_order() {
510        let impersonation = ImpersonationManager::default();
511        let pool_fifo = TxPool::new(impersonation.clone(), TransactionOrder::Fifo);
512        let pool_fees = TxPool::new(impersonation.clone(), TransactionOrder::Fees);
513
514        let txs: Vec<Transaction> = [1, 2, 3]
515            .iter()
516            .map(|index| {
517                let tx: Transaction = testing::TransactionBuilder::new()
518                    .set_max_fee_per_gas(U256::from(50_000_000 + index))
519                    .build()
520                    .into();
521                pool_fifo.add_tx(tx.clone());
522                pool_fees.add_tx(tx.clone());
523                tx
524            })
525            .collect();
526
527        assert_eq!(
528            pool_fifo.take_uniform(3),
529            Some(TxBatch {
530                impersonating: false,
531                txs: vec![txs[0].clone(), txs[1].clone(), txs[2].clone()]
532            })
533        );
534
535        assert_eq!(
536            pool_fees.take_uniform(3),
537            Some(TxBatch {
538                impersonating: false,
539                txs: vec![txs[2].clone(), txs[1].clone(), txs[0].clone()]
540            })
541        );
542    }
543}