anvil_zksync_core/node/
sealer.rs

1use super::inner::node_executor::NodeExecutorHandle;
2use super::pool::{TxBatch, TxPool};
3use futures::channel::mpsc::Receiver;
4use futures::stream::{Fuse, StreamExt};
5use futures::task::AtomicWaker;
6use futures::Stream;
7use std::pin::Pin;
8use std::sync::{Arc, RwLock};
9use std::task::{Context, Poll};
10use std::time::Duration;
11use tokio::time::{Interval, MissedTickBehavior};
12use zksync_types::H256;
13
14// TODO: `BlockSealer` is probably a bad name as this doesn't actually seal blocks, just decides
15//       that certain tx batch needs to be sealed. The actual sealing is handled in `NodeExecutor`.
16//       Consider renaming.
17pub struct BlockSealer {
18    /// Block sealer state (externally mutable).
19    state: BlockSealerState,
20    /// Pool where block sealer is sourcing transactions from.
21    pool: TxPool,
22    /// Node handle to be used when a block needs to be sealed.
23    node_handle: NodeExecutorHandle,
24}
25
26impl BlockSealer {
27    pub fn new(
28        mode: BlockSealerMode,
29        pool: TxPool,
30        node_handle: NodeExecutorHandle,
31    ) -> (Self, BlockSealerState) {
32        let state = BlockSealerState {
33            mode: Arc::new(RwLock::new(mode)),
34            waker: Arc::new(AtomicWaker::new()),
35        };
36        (
37            Self {
38                state: state.clone(),
39                pool,
40                node_handle,
41            },
42            state,
43        )
44    }
45
46    pub async fn run(self) -> anyhow::Result<()> {
47        loop {
48            tracing::debug!("polling for a new tx batch");
49            let tx_batch = futures::future::poll_fn(|cx| {
50                // Register to be woken up when sealer mode changes
51                self.state.waker.register(cx.waker());
52                let mut mode = self
53                    .state
54                    .mode
55                    .write()
56                    .expect("BlockSealer lock is poisoned");
57                match &mut *mode {
58                    BlockSealerMode::Noop => Poll::Pending,
59                    BlockSealerMode::Immediate(immediate) => immediate.poll(&self.pool, cx),
60                    BlockSealerMode::FixedTime(fixed) => fixed.poll(&self.pool, cx),
61                }
62            })
63            .await;
64            tracing::debug!(
65                impersonating = tx_batch.impersonating,
66                txs = tx_batch.txs.len(),
67                "new tx batch found"
68            );
69            self.node_handle.seal_block(tx_batch).await?;
70        }
71    }
72}
73
74#[derive(Clone, Debug)]
75pub struct BlockSealerState {
76    /// The mode this sealer currently operates in
77    mode: Arc<RwLock<BlockSealerMode>>,
78    /// Used for task wake up when the sealing mode was forcefully changed
79    waker: Arc<AtomicWaker>,
80}
81
82impl BlockSealerState {
83    pub fn is_immediate(&self) -> bool {
84        matches!(
85            *self.mode.read().expect("BlockSealer lock is poisoned"),
86            BlockSealerMode::Immediate(_)
87        )
88    }
89
90    pub fn set_mode(&self, mode: BlockSealerMode) {
91        *self.mode.write().expect("BlockSealer lock is poisoned") = mode;
92        // Notify last used waker that the mode might have changed
93        self.waker.wake();
94    }
95}
96
97/// Represents different modes of block sealing available on the node
98#[derive(Debug)]
99pub enum BlockSealerMode {
100    /// Never seals blocks.
101    Noop,
102    /// Seals a block as soon as there is at least one transaction.
103    Immediate(ImmediateBlockSealer),
104    /// Seals a new block every `interval` tick
105    FixedTime(FixedTimeBlockSealer),
106}
107
108impl BlockSealerMode {
109    pub fn noop() -> Self {
110        Self::Noop
111    }
112
113    pub fn immediate(max_transactions: usize, listener: Receiver<H256>) -> Self {
114        Self::Immediate(ImmediateBlockSealer {
115            max_transactions,
116            rx: listener.fuse(),
117        })
118    }
119
120    pub fn fixed_time(max_transactions: usize, block_time: Duration) -> Self {
121        Self::FixedTime(FixedTimeBlockSealer::new(max_transactions, block_time))
122    }
123
124    pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
125        match self {
126            BlockSealerMode::Noop => Poll::Pending,
127            BlockSealerMode::Immediate(immediate) => immediate.poll(pool, cx),
128            BlockSealerMode::FixedTime(fixed) => fixed.poll(pool, cx),
129        }
130    }
131}
132
133#[derive(Debug)]
134pub struct ImmediateBlockSealer {
135    /// Maximum number of transactions to include in a block.
136    max_transactions: usize,
137    /// Receives hashes of new transactions.
138    rx: Fuse<Receiver<H256>>,
139}
140
141impl ImmediateBlockSealer {
142    pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
143        match pool.take_uniform(self.max_transactions) {
144            Some(tx_batch) => Poll::Ready(tx_batch),
145            None => {
146                let mut has_new_txs = false;
147                // Yield until new transactions are available in the pool
148                while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
149                    has_new_txs = true;
150                }
151
152                if has_new_txs {
153                    self.poll(pool, cx)
154                } else {
155                    Poll::Pending
156                }
157            }
158        }
159    }
160}
161
162#[derive(Debug)]
163pub struct FixedTimeBlockSealer {
164    /// Maximum number of transactions to include in a block.
165    max_transactions: usize,
166    /// The interval when a block should be sealed.
167    interval: Interval,
168}
169
170impl FixedTimeBlockSealer {
171    pub fn new(max_transactions: usize, block_time: Duration) -> Self {
172        let start = tokio::time::Instant::now() + block_time;
173        let mut interval = tokio::time::interval_at(start, block_time);
174        // Avoid shortening interval if a tick was missed
175        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
176        Self {
177            max_transactions,
178            interval,
179        }
180    }
181
182    pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
183        if self.interval.poll_tick(cx).is_ready() {
184            // Return a batch even if the pool is empty, i.e. we produce empty blocks by design in
185            // fixed time mode.
186            let tx_batch = pool.take_uniform(self.max_transactions).unwrap_or(TxBatch {
187                impersonating: false,
188                txs: vec![],
189            });
190            return Poll::Ready(tx_batch);
191        }
192        Poll::Pending
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use crate::node::node_executor::testing::NodeExecutorTester;
199    use crate::node::pool::TxBatch;
200    use crate::node::sealer::BlockSealerMode;
201    use crate::node::{BlockSealer, ImpersonationManager, TxPool};
202    use anvil_zksync_types::TransactionOrder;
203    use std::time::Duration;
204    use tokio::task::JoinHandle;
205
206    struct BlockSealerTester {
207        _handle: JoinHandle<anyhow::Result<()>>,
208        node_executor_tester: NodeExecutorTester,
209    }
210
211    impl BlockSealerTester {
212        fn new(sealer_mode_fn: impl FnOnce(&TxPool) -> BlockSealerMode) -> (Self, TxPool) {
213            let (node_executor_tester, node_handle) = NodeExecutorTester::new();
214            let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo);
215            let (block_sealer, _) =
216                BlockSealer::new(sealer_mode_fn(&pool), pool.clone(), node_handle);
217            let _handle = tokio::spawn(block_sealer.run());
218
219            (
220                Self {
221                    _handle,
222                    node_executor_tester,
223                },
224                pool,
225            )
226        }
227    }
228
229    #[tokio::test]
230    async fn immediate_empty() -> anyhow::Result<()> {
231        let (tester, _pool) =
232            BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
233
234        tester.node_executor_tester.expect_empty().await
235    }
236
237    #[tokio::test]
238    async fn immediate_one_tx() -> anyhow::Result<()> {
239        let (tester, pool) =
240            BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
241
242        let [tx] = pool.populate::<1>();
243        tester
244            .node_executor_tester
245            .expect_seal_block(TxBatch {
246                impersonating: false,
247                txs: vec![tx],
248            })
249            .await
250    }
251
252    #[tokio::test]
253    async fn immediate_several_txs() -> anyhow::Result<()> {
254        let (tester, pool) =
255            BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
256
257        let txs = pool.populate::<10>();
258        tester
259            .node_executor_tester
260            .expect_seal_block(TxBatch {
261                impersonating: false,
262                txs: txs.to_vec(),
263            })
264            .await
265    }
266
267    #[tokio::test]
268    async fn immediate_respect_max_txs() -> anyhow::Result<()> {
269        let (tester, pool) =
270            BlockSealerTester::new(|pool| BlockSealerMode::immediate(3, pool.add_tx_listener()));
271
272        let txs = pool.populate::<10>();
273        for txs in txs.chunks(3) {
274            tester
275                .node_executor_tester
276                .expect_seal_block(TxBatch {
277                    impersonating: false,
278                    txs: txs.to_vec(),
279                })
280                .await?;
281        }
282        Ok(())
283    }
284
285    #[tokio::test]
286    async fn immediate_gradual_txs() -> anyhow::Result<()> {
287        let (tester, pool) =
288            BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
289
290        // Txs are added to the pool in small chunks
291        let txs0 = pool.populate::<3>();
292        let txs1 = pool.populate::<4>();
293        let txs2 = pool.populate::<5>();
294
295        let mut txs = txs0.to_vec();
296        txs.extend(txs1);
297        txs.extend(txs2);
298
299        tester
300            .node_executor_tester
301            .expect_seal_block(TxBatch {
302                impersonating: false,
303                txs,
304            })
305            .await?;
306
307        // Txs added after the first poll should be available for sealing
308        let txs = pool.populate::<10>().to_vec();
309        tester
310            .node_executor_tester
311            .expect_seal_block(TxBatch {
312                impersonating: false,
313                txs,
314            })
315            .await
316    }
317
318    #[tokio::test]
319    async fn fixed_time_very_long() -> anyhow::Result<()> {
320        let (tester, _pool) = BlockSealerTester::new(|_| {
321            BlockSealerMode::fixed_time(1000, Duration::from_secs(10000))
322        });
323
324        tester.node_executor_tester.expect_empty().await
325    }
326
327    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
328    async fn fixed_time_seal_empty() -> anyhow::Result<()> {
329        let (tester, _pool) = BlockSealerTester::new(|_| {
330            BlockSealerMode::fixed_time(1000, Duration::from_millis(100))
331        });
332
333        // Sleep enough time to produce exactly 1 block
334        tokio::time::sleep(Duration::from_millis(150)).await;
335
336        // Sealer should have sealed exactly one empty block by now
337        tester
338            .node_executor_tester
339            .expect_seal_block_immediate(TxBatch {
340                impersonating: false,
341                txs: vec![],
342            })
343            .await?;
344        tester.node_executor_tester.expect_empty_immediate().await?;
345
346        // Sleep enough time to produce one more block
347        tokio::time::sleep(Duration::from_millis(150)).await;
348
349        // Next block should be sealable
350        tester
351            .node_executor_tester
352            .expect_seal_block_immediate(TxBatch {
353                impersonating: false,
354                txs: vec![],
355            })
356            .await
357    }
358
359    #[tokio::test]
360    async fn fixed_time_seal_with_txs() -> anyhow::Result<()> {
361        let (tester, pool) = BlockSealerTester::new(|_| {
362            BlockSealerMode::fixed_time(1000, Duration::from_millis(100))
363        });
364
365        let txs = pool.populate::<3>();
366
367        // Sleep enough time to produce one block
368        tokio::time::sleep(Duration::from_millis(150)).await;
369
370        tester
371            .node_executor_tester
372            .expect_seal_block_immediate(TxBatch {
373                impersonating: false,
374                txs: txs.to_vec(),
375            })
376            .await
377    }
378
379    #[tokio::test]
380    async fn fixed_time_respect_max_txs() -> anyhow::Result<()> {
381        let (tester, pool) =
382            BlockSealerTester::new(|_| BlockSealerMode::fixed_time(3, Duration::from_millis(100)));
383
384        let txs = pool.populate::<10>();
385
386        for txs in txs.chunks(3) {
387            // Sleep enough time to produce one block
388            tokio::time::sleep(Duration::from_millis(150)).await;
389
390            tester
391                .node_executor_tester
392                .expect_seal_block_immediate(TxBatch {
393                    impersonating: false,
394                    txs: txs.to_vec(),
395                })
396                .await?;
397        }
398
399        Ok(())
400    }
401}