1use super::inner::node_executor::NodeExecutorHandle;
2use super::pool::{TxBatch, TxPool};
3use futures::channel::mpsc::Receiver;
4use futures::stream::{Fuse, StreamExt};
5use futures::task::AtomicWaker;
6use futures::Stream;
7use std::pin::Pin;
8use std::sync::{Arc, RwLock};
9use std::task::{Context, Poll};
10use std::time::Duration;
11use tokio::time::{Interval, MissedTickBehavior};
12use zksync_types::H256;
13
14pub struct BlockSealer {
18 state: BlockSealerState,
20 pool: TxPool,
22 node_handle: NodeExecutorHandle,
24}
25
26impl BlockSealer {
27 pub fn new(
28 mode: BlockSealerMode,
29 pool: TxPool,
30 node_handle: NodeExecutorHandle,
31 ) -> (Self, BlockSealerState) {
32 let state = BlockSealerState {
33 mode: Arc::new(RwLock::new(mode)),
34 waker: Arc::new(AtomicWaker::new()),
35 };
36 (
37 Self {
38 state: state.clone(),
39 pool,
40 node_handle,
41 },
42 state,
43 )
44 }
45
46 pub async fn run(self) -> anyhow::Result<()> {
47 loop {
48 tracing::debug!("polling for a new tx batch");
49 let tx_batch = futures::future::poll_fn(|cx| {
50 self.state.waker.register(cx.waker());
52 let mut mode = self
53 .state
54 .mode
55 .write()
56 .expect("BlockSealer lock is poisoned");
57 match &mut *mode {
58 BlockSealerMode::Noop => Poll::Pending,
59 BlockSealerMode::Immediate(immediate) => immediate.poll(&self.pool, cx),
60 BlockSealerMode::FixedTime(fixed) => fixed.poll(&self.pool, cx),
61 }
62 })
63 .await;
64 tracing::debug!(
65 impersonating = tx_batch.impersonating,
66 txs = tx_batch.txs.len(),
67 "new tx batch found"
68 );
69 self.node_handle.seal_block(tx_batch).await?;
70 }
71 }
72}
73
74#[derive(Clone, Debug)]
75pub struct BlockSealerState {
76 mode: Arc<RwLock<BlockSealerMode>>,
78 waker: Arc<AtomicWaker>,
80}
81
82impl BlockSealerState {
83 pub fn is_immediate(&self) -> bool {
84 matches!(
85 *self.mode.read().expect("BlockSealer lock is poisoned"),
86 BlockSealerMode::Immediate(_)
87 )
88 }
89
90 pub fn set_mode(&self, mode: BlockSealerMode) {
91 *self.mode.write().expect("BlockSealer lock is poisoned") = mode;
92 self.waker.wake();
94 }
95}
96
97#[derive(Debug)]
99pub enum BlockSealerMode {
100 Noop,
102 Immediate(ImmediateBlockSealer),
104 FixedTime(FixedTimeBlockSealer),
106}
107
108impl BlockSealerMode {
109 pub fn noop() -> Self {
110 Self::Noop
111 }
112
113 pub fn immediate(max_transactions: usize, listener: Receiver<H256>) -> Self {
114 Self::Immediate(ImmediateBlockSealer {
115 max_transactions,
116 rx: listener.fuse(),
117 })
118 }
119
120 pub fn fixed_time(max_transactions: usize, block_time: Duration) -> Self {
121 Self::FixedTime(FixedTimeBlockSealer::new(max_transactions, block_time))
122 }
123
124 pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
125 match self {
126 BlockSealerMode::Noop => Poll::Pending,
127 BlockSealerMode::Immediate(immediate) => immediate.poll(pool, cx),
128 BlockSealerMode::FixedTime(fixed) => fixed.poll(pool, cx),
129 }
130 }
131}
132
133#[derive(Debug)]
134pub struct ImmediateBlockSealer {
135 max_transactions: usize,
137 rx: Fuse<Receiver<H256>>,
139}
140
141impl ImmediateBlockSealer {
142 pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
143 match pool.take_uniform(self.max_transactions) {
144 Some(tx_batch) => Poll::Ready(tx_batch),
145 None => {
146 let mut has_new_txs = false;
147 while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
149 has_new_txs = true;
150 }
151
152 if has_new_txs {
153 self.poll(pool, cx)
154 } else {
155 Poll::Pending
156 }
157 }
158 }
159 }
160}
161
162#[derive(Debug)]
163pub struct FixedTimeBlockSealer {
164 max_transactions: usize,
166 interval: Interval,
168}
169
170impl FixedTimeBlockSealer {
171 pub fn new(max_transactions: usize, block_time: Duration) -> Self {
172 let start = tokio::time::Instant::now() + block_time;
173 let mut interval = tokio::time::interval_at(start, block_time);
174 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
176 Self {
177 max_transactions,
178 interval,
179 }
180 }
181
182 pub fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<TxBatch> {
183 if self.interval.poll_tick(cx).is_ready() {
184 let tx_batch = pool.take_uniform(self.max_transactions).unwrap_or(TxBatch {
187 impersonating: false,
188 txs: vec![],
189 });
190 return Poll::Ready(tx_batch);
191 }
192 Poll::Pending
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use crate::node::node_executor::testing::NodeExecutorTester;
199 use crate::node::pool::TxBatch;
200 use crate::node::sealer::BlockSealerMode;
201 use crate::node::{BlockSealer, ImpersonationManager, TxPool};
202 use anvil_zksync_types::TransactionOrder;
203 use std::time::Duration;
204 use tokio::task::JoinHandle;
205
206 struct BlockSealerTester {
207 _handle: JoinHandle<anyhow::Result<()>>,
208 node_executor_tester: NodeExecutorTester,
209 }
210
211 impl BlockSealerTester {
212 fn new(sealer_mode_fn: impl FnOnce(&TxPool) -> BlockSealerMode) -> (Self, TxPool) {
213 let (node_executor_tester, node_handle) = NodeExecutorTester::new();
214 let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo);
215 let (block_sealer, _) =
216 BlockSealer::new(sealer_mode_fn(&pool), pool.clone(), node_handle);
217 let _handle = tokio::spawn(block_sealer.run());
218
219 (
220 Self {
221 _handle,
222 node_executor_tester,
223 },
224 pool,
225 )
226 }
227 }
228
229 #[tokio::test]
230 async fn immediate_empty() -> anyhow::Result<()> {
231 let (tester, _pool) =
232 BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
233
234 tester.node_executor_tester.expect_empty().await
235 }
236
237 #[tokio::test]
238 async fn immediate_one_tx() -> anyhow::Result<()> {
239 let (tester, pool) =
240 BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
241
242 let [tx] = pool.populate::<1>();
243 tester
244 .node_executor_tester
245 .expect_seal_block(TxBatch {
246 impersonating: false,
247 txs: vec![tx],
248 })
249 .await
250 }
251
252 #[tokio::test]
253 async fn immediate_several_txs() -> anyhow::Result<()> {
254 let (tester, pool) =
255 BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
256
257 let txs = pool.populate::<10>();
258 tester
259 .node_executor_tester
260 .expect_seal_block(TxBatch {
261 impersonating: false,
262 txs: txs.to_vec(),
263 })
264 .await
265 }
266
267 #[tokio::test]
268 async fn immediate_respect_max_txs() -> anyhow::Result<()> {
269 let (tester, pool) =
270 BlockSealerTester::new(|pool| BlockSealerMode::immediate(3, pool.add_tx_listener()));
271
272 let txs = pool.populate::<10>();
273 for txs in txs.chunks(3) {
274 tester
275 .node_executor_tester
276 .expect_seal_block(TxBatch {
277 impersonating: false,
278 txs: txs.to_vec(),
279 })
280 .await?;
281 }
282 Ok(())
283 }
284
285 #[tokio::test]
286 async fn immediate_gradual_txs() -> anyhow::Result<()> {
287 let (tester, pool) =
288 BlockSealerTester::new(|pool| BlockSealerMode::immediate(1000, pool.add_tx_listener()));
289
290 let txs0 = pool.populate::<3>();
292 let txs1 = pool.populate::<4>();
293 let txs2 = pool.populate::<5>();
294
295 let mut txs = txs0.to_vec();
296 txs.extend(txs1);
297 txs.extend(txs2);
298
299 tester
300 .node_executor_tester
301 .expect_seal_block(TxBatch {
302 impersonating: false,
303 txs,
304 })
305 .await?;
306
307 let txs = pool.populate::<10>().to_vec();
309 tester
310 .node_executor_tester
311 .expect_seal_block(TxBatch {
312 impersonating: false,
313 txs,
314 })
315 .await
316 }
317
318 #[tokio::test]
319 async fn fixed_time_very_long() -> anyhow::Result<()> {
320 let (tester, _pool) = BlockSealerTester::new(|_| {
321 BlockSealerMode::fixed_time(1000, Duration::from_secs(10000))
322 });
323
324 tester.node_executor_tester.expect_empty().await
325 }
326
327 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
328 async fn fixed_time_seal_empty() -> anyhow::Result<()> {
329 let (tester, _pool) = BlockSealerTester::new(|_| {
330 BlockSealerMode::fixed_time(1000, Duration::from_millis(100))
331 });
332
333 tokio::time::sleep(Duration::from_millis(150)).await;
335
336 tester
338 .node_executor_tester
339 .expect_seal_block_immediate(TxBatch {
340 impersonating: false,
341 txs: vec![],
342 })
343 .await?;
344 tester.node_executor_tester.expect_empty_immediate().await?;
345
346 tokio::time::sleep(Duration::from_millis(150)).await;
348
349 tester
351 .node_executor_tester
352 .expect_seal_block_immediate(TxBatch {
353 impersonating: false,
354 txs: vec![],
355 })
356 .await
357 }
358
359 #[tokio::test]
360 async fn fixed_time_seal_with_txs() -> anyhow::Result<()> {
361 let (tester, pool) = BlockSealerTester::new(|_| {
362 BlockSealerMode::fixed_time(1000, Duration::from_millis(100))
363 });
364
365 let txs = pool.populate::<3>();
366
367 tokio::time::sleep(Duration::from_millis(150)).await;
369
370 tester
371 .node_executor_tester
372 .expect_seal_block_immediate(TxBatch {
373 impersonating: false,
374 txs: txs.to_vec(),
375 })
376 .await
377 }
378
379 #[tokio::test]
380 async fn fixed_time_respect_max_txs() -> anyhow::Result<()> {
381 let (tester, pool) =
382 BlockSealerTester::new(|_| BlockSealerMode::fixed_time(3, Duration::from_millis(100)));
383
384 let txs = pool.populate::<10>();
385
386 for txs in txs.chunks(3) {
387 tokio::time::sleep(Duration::from_millis(150)).await;
389
390 tester
391 .node_executor_tester
392 .expect_seal_block_immediate(TxBatch {
393 impersonating: false,
394 txs: txs.to_vec(),
395 })
396 .await?;
397 }
398
399 Ok(())
400 }
401}