anvil_zksync_core/node/batch/
factory.rs

1use super::executor::{Command, MainBatchExecutor};
2use super::shared::Sealed;
3use crate::bootloader_debug::{BootloaderDebug, BootloaderDebugTracer};
4use crate::deps::InMemoryStorage;
5use crate::node::boojumos::BoojumOsVM;
6use crate::node::traces::call_error::CallErrorTracer;
7use anvil_zksync_config::types::BoojumConfig;
8use anyhow::Context as _;
9use once_cell::sync::OnceCell;
10use std::sync::RwLock;
11use std::{fmt, marker::PhantomData, rc::Rc, sync::Arc};
12use tokio::sync::mpsc;
13use zksync_multivm::interface::{InspectExecutionMode, VmExecutionResultAndLogs};
14use zksync_multivm::{
15    interface::{
16        executor::{BatchExecutor, BatchExecutorFactory},
17        pubdata::PubdataBuilder,
18        storage::{ReadStorage, StoragePtr, StorageView},
19        utils::{DivergenceHandler, ShadowMut},
20        BatchTransactionExecutionResult, Call, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv,
21        L2BlockEnv, SystemEnv, VmFactory, VmInterface, VmInterfaceHistoryEnabled,
22    },
23    is_supported_by_fast_vm,
24    pubdata_builders::pubdata_params_to_builder,
25    tracers::CallTracer,
26    vm_fast,
27    vm_fast::FastValidationTracer,
28    vm_latest::HistoryEnabled,
29    FastVmInstance, LegacyVmInstance, MultiVmTracer,
30};
31use zksync_types::{commitment::PubdataParams, vm::FastVmMode, Transaction};
32
33#[doc(hidden)]
34pub trait CallTracingTracer: vm_fast::interface::Tracer + Default {
35    fn into_traces(self) -> Vec<Call>;
36}
37
38impl CallTracingTracer for () {
39    fn into_traces(self) -> Vec<Call> {
40        vec![]
41    }
42}
43
44impl CallTracingTracer for vm_fast::CallTracer {
45    fn into_traces(self) -> Vec<Call> {
46        self.into_result()
47    }
48}
49
50/// Encapsulates a tracer used during batch processing. Currently supported tracers are `()` (no-op) and [`TraceCalls`].
51///
52/// All members of this trait are implementation details.
53pub trait BatchTracer: fmt::Debug + 'static + Send + Sealed {
54    /// True if call tracing is enabled. Used by legacy VMs which enable / disable call tracing dynamically.
55    #[doc(hidden)]
56    const TRACE_CALLS: bool;
57    /// Tracer for the fast VM.
58    #[doc(hidden)]
59    type Fast: CallTracingTracer;
60}
61
62impl Sealed for () {}
63
64/// No-op implementation that doesn't trace anything.
65impl BatchTracer for () {
66    const TRACE_CALLS: bool = false;
67    type Fast = ();
68}
69
70/// [`BatchTracer`] implementation tracing calls (returned in [`BatchTransactionExecutionResult`]s).
71#[derive(Debug)]
72pub struct TraceCalls(());
73
74impl Sealed for TraceCalls {}
75
76impl BatchTracer for TraceCalls {
77    const TRACE_CALLS: bool = true;
78    type Fast = vm_fast::CallTracer;
79}
80
81/// The default implementation of [`BatchExecutorFactory`].
82/// Creates real batch executors which maintain the VM (as opposed to the test factories which don't use the VM).
83#[derive(Debug, Clone)]
84pub struct MainBatchExecutorFactory<Tr> {
85    /// Whether batch executor would allow transactions with bytecode that cannot be compressed.
86    /// For new blocks, bytecode compression is mandatory -- if bytecode compression is not supported,
87    /// the transaction will be rejected.
88    /// Note that this flag, if set to `true`, is strictly more permissive than if set to `false`. It means
89    /// that in cases where the node is expected to process any transactions processed by the sequencer
90    /// regardless of its configuration, this flag should be set to `true`.
91    enforced_bytecode_compression: bool,
92    fast_vm_mode: FastVmMode,
93    skip_signature_verification: bool,
94    divergence_handler: Option<DivergenceHandler>,
95    legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
96    boojum: BoojumConfig,
97    _tracer: PhantomData<Tr>,
98}
99
100impl<Tr: BatchTracer> MainBatchExecutorFactory<Tr> {
101    pub fn new(
102        enforced_bytecode_compression: bool,
103        legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
104        boojum: BoojumConfig,
105    ) -> Self {
106        Self {
107            enforced_bytecode_compression,
108            fast_vm_mode: FastVmMode::Old,
109            skip_signature_verification: false,
110            divergence_handler: None,
111            legacy_bootloader_debug_result,
112            boojum,
113            _tracer: PhantomData,
114        }
115    }
116
117    /// Custom method (not present in zksync-era) that intentionally leaks [`MainBatchExecutor`]
118    /// implementation to enable the usage of [`MainBatchExecutor::bootloader`].
119    ///
120    /// To be deleted once we stop sealing batches on every block.
121    pub(crate) fn init_main_batch<S: ReadStorage + Send + 'static>(
122        &mut self,
123        storage: S,
124        l1_batch_params: L1BatchEnv,
125        system_env: SystemEnv,
126        pubdata_params: PubdataParams,
127        iterable_storage: Option<InMemoryStorage>,
128    ) -> MainBatchExecutor<S> {
129        // Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
130        // until a previous command is processed), capacity 1 is enough for the commands channel.
131        let (commands_sender, commands_receiver) = mpsc::channel(1);
132        let executor = CommandReceiver {
133            enforced_bytecode_compression: self.enforced_bytecode_compression,
134            fast_vm_mode: self.fast_vm_mode,
135            boojum: self.boojum.clone(),
136            skip_signature_verification: self.skip_signature_verification,
137            divergence_handler: self.divergence_handler.clone(),
138            commands: commands_receiver,
139            legacy_bootloader_debug_result: self.legacy_bootloader_debug_result.clone(),
140            _storage: PhantomData,
141            _tracer: PhantomData::<Tr>,
142        };
143
144        let handle = tokio::task::spawn_blocking(move || {
145            executor.run(
146                storage,
147                l1_batch_params,
148                system_env,
149                pubdata_params_to_builder(pubdata_params),
150                iterable_storage,
151            )
152        });
153        MainBatchExecutor::new(handle, commands_sender)
154    }
155}
156
157impl<S: ReadStorage + Send + 'static, Tr: BatchTracer> BatchExecutorFactory<S>
158    for MainBatchExecutorFactory<Tr>
159{
160    fn init_batch(
161        &mut self,
162        storage: S,
163        l1_batch_params: L1BatchEnv,
164        system_env: SystemEnv,
165        pubdata_params: PubdataParams,
166    ) -> Box<dyn BatchExecutor<S>> {
167        Box::new(self.init_main_batch(storage, l1_batch_params, system_env, pubdata_params, None))
168    }
169}
170
171#[derive(Debug)]
172#[allow(clippy::large_enum_variant)]
173enum BatchVm<S: ReadStorage, Tr: BatchTracer> {
174    Legacy(LegacyVmInstance<S, HistoryEnabled>),
175    Fast(FastVmInstance<S, Tr::Fast>),
176    BoojumOS(BoojumOsVM<StorageView<S>, HistoryEnabled>),
177}
178
179macro_rules! dispatch_batch_vm {
180    ($self:ident.$function:ident($($params:tt)*)) => {
181        match $self {
182            Self::Legacy(vm) => vm.$function($($params)*),
183            Self::Fast(vm) => vm.$function($($params)*),
184            Self::BoojumOS(vm) => vm.$function($($params)*),
185        }
186    };
187}
188
189impl<S: ReadStorage, Tr: BatchTracer> BatchVm<S, Tr> {
190    fn new(
191        l1_batch_env: L1BatchEnv,
192        system_env: SystemEnv,
193        storage_ptr: StoragePtr<StorageView<S>>,
194        mode: FastVmMode,
195        boojum: &BoojumConfig,
196        all_values: Option<InMemoryStorage>,
197    ) -> Self {
198        if boojum.use_boojum {
199            return Self::BoojumOS(BoojumOsVM::new(
200                l1_batch_env,
201                system_env,
202                storage_ptr,
203                &all_values.unwrap(),
204                boojum,
205            ));
206        }
207        if !is_supported_by_fast_vm(system_env.version) {
208            return Self::Legacy(LegacyVmInstance::new(l1_batch_env, system_env, storage_ptr));
209        }
210
211        match mode {
212            FastVmMode::Old => {
213                Self::Legacy(LegacyVmInstance::new(l1_batch_env, system_env, storage_ptr))
214            }
215            FastVmMode::New => {
216                Self::Fast(FastVmInstance::fast(l1_batch_env, system_env, storage_ptr))
217            }
218            FastVmMode::Shadow => Self::Fast(FastVmInstance::shadowed(
219                l1_batch_env,
220                system_env,
221                storage_ptr,
222            )),
223        }
224    }
225
226    fn start_new_l2_block(&mut self, l2_block: L2BlockEnv) {
227        dispatch_batch_vm!(self.start_new_l2_block(l2_block));
228    }
229
230    fn finish_batch(&mut self, pubdata_builder: Rc<dyn PubdataBuilder>) -> FinishedL1Batch {
231        dispatch_batch_vm!(self.finish_batch(pubdata_builder))
232    }
233
234    fn bootloader(&mut self) -> VmExecutionResultAndLogs {
235        dispatch_batch_vm!(self.inspect(&mut Default::default(), InspectExecutionMode::Bootloader))
236    }
237
238    fn make_snapshot(&mut self) {
239        dispatch_batch_vm!(self.make_snapshot());
240    }
241
242    fn rollback_to_the_latest_snapshot(&mut self) {
243        dispatch_batch_vm!(self.rollback_to_the_latest_snapshot());
244    }
245
246    fn pop_snapshot_no_rollback(&mut self) {
247        dispatch_batch_vm!(self.pop_snapshot_no_rollback());
248    }
249
250    fn inspect_transaction(
251        &mut self,
252        tx: Transaction,
253        with_compression: bool,
254        legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
255    ) -> BatchTransactionExecutionResult {
256        let legacy_tracer_result = Arc::new(OnceCell::default());
257        let legacy_error_flags_result = Arc::new(OnceCell::new());
258        let mut legacy_tracer = if Tr::TRACE_CALLS {
259            vec![CallTracer::new(legacy_tracer_result.clone()).into_tracer_pointer()]
260        } else {
261            vec![]
262        };
263        legacy_tracer
264            .push(BootloaderDebugTracer::new(legacy_bootloader_debug_result).into_tracer_pointer());
265        legacy_tracer
266            .push(CallErrorTracer::new(legacy_error_flags_result.clone()).into_tracer_pointer());
267        let mut legacy_tracer = legacy_tracer.into();
268        let mut fast_traces = vec![];
269
270        let (compression_result, tx_result) = match self {
271            Self::Legacy(vm) => vm.inspect_transaction_with_bytecode_compression(
272                &mut legacy_tracer,
273                tx,
274                with_compression,
275            ),
276            Self::BoojumOS(vm) => {
277                vm.push_transaction(tx);
278                let res = vm.inspect(&mut legacy_tracer.into(), InspectExecutionMode::OneTx);
279
280                (Ok((&[]).into()), res)
281            }
282
283            Self::Fast(vm) => {
284                let mut tracer = (
285                    legacy_tracer.into(),
286                    (Tr::Fast::default(), FastValidationTracer::default()),
287                );
288                let res = vm.inspect_transaction_with_bytecode_compression(
289                    &mut tracer,
290                    tx,
291                    with_compression,
292                );
293                let (_, (call_tracer, _)) = tracer;
294                fast_traces = call_tracer.into_traces();
295                res
296            }
297        };
298
299        let compressed_bytecodes = compression_result.map(drop);
300        let legacy_traces = Arc::try_unwrap(legacy_tracer_result)
301            .expect("failed extracting call traces")
302            .take()
303            .unwrap_or_default();
304        let call_traces = match self {
305            Self::Legacy(_) => legacy_traces,
306            Self::BoojumOS(_) => legacy_traces,
307            Self::Fast(FastVmInstance::Fast(_)) => fast_traces,
308            Self::Fast(FastVmInstance::Shadowed(vm)) => {
309                vm.get_custom_mut("call_traces", |r| match r {
310                    ShadowMut::Main(_) => legacy_traces.as_slice(),
311                    ShadowMut::Shadow(_) => fast_traces.as_slice(),
312                });
313                fast_traces
314            }
315        };
316
317        BatchTransactionExecutionResult {
318            tx_result: Box::new(tx_result),
319            compression_result: compressed_bytecodes,
320            call_traces,
321        }
322    }
323}
324
325/// Implementation of the "primary" (non-test) batch executor.
326/// Upon launch, it initializes the VM object with provided block context and properties, and keeps invoking the commands
327/// sent to it one by one until the batch is finished.
328///
329/// One `CommandReceiver` can execute exactly one batch, so once the batch is sealed, a new `CommandReceiver` object must
330/// be constructed.
331#[derive(Debug)]
332struct CommandReceiver<S, Tr> {
333    enforced_bytecode_compression: bool,
334    fast_vm_mode: FastVmMode,
335    boojum: BoojumConfig,
336    skip_signature_verification: bool,
337    divergence_handler: Option<DivergenceHandler>,
338    commands: mpsc::Receiver<Command>,
339    legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
340    _storage: PhantomData<S>,
341    _tracer: PhantomData<Tr>,
342}
343
344impl<S: ReadStorage + 'static, Tr: BatchTracer> CommandReceiver<S, Tr> {
345    pub(super) fn run(
346        mut self,
347        storage: S,
348        l1_batch_params: L1BatchEnv,
349        system_env: SystemEnv,
350        pubdata_builder: Rc<dyn PubdataBuilder>,
351        all_values: Option<InMemoryStorage>,
352    ) -> anyhow::Result<StorageView<S>> {
353        tracing::info!("Starting executing L1 batch #{}", &l1_batch_params.number);
354        if self.boojum.use_boojum {
355            tracing::info!("Using BoojumOS VM");
356        }
357
358        let storage_view = StorageView::new(storage).to_rc_ptr();
359        let mut vm = BatchVm::<S, Tr>::new(
360            l1_batch_params,
361            system_env,
362            storage_view.clone(),
363            self.fast_vm_mode,
364            &self.boojum,
365            all_values,
366        );
367
368        if self.skip_signature_verification {
369            if let BatchVm::Fast(vm) = &mut vm {
370                vm.skip_signature_verification();
371            }
372        }
373        let mut batch_finished = false;
374
375        if let BatchVm::Fast(FastVmInstance::Shadowed(shadowed)) = &mut vm {
376            if let Some(handler) = self.divergence_handler.take() {
377                shadowed.set_divergence_handler(handler);
378            }
379        }
380
381        while let Some(cmd) = self.commands.blocking_recv() {
382            match cmd {
383                Command::ExecuteTx(tx, resp) => {
384                    let tx_hash = tx.hash();
385                    let result = self.execute_tx(*tx, &mut vm).with_context(|| {
386                        format!("fatal error executing transaction {tx_hash:?}")
387                    })?;
388
389                    if resp.send(result).is_err() {
390                        break;
391                    }
392                }
393                Command::RollbackLastTx(resp) => {
394                    self.rollback_last_tx(&mut vm);
395                    if resp.send(()).is_err() {
396                        break;
397                    }
398                }
399                Command::StartNextL2Block(l2_block_env, resp) => {
400                    vm.start_new_l2_block(l2_block_env);
401                    if resp.send(()).is_err() {
402                        break;
403                    }
404                }
405                Command::FinishBatch(resp) => {
406                    let vm_block_result = self.finish_batch(&mut vm, pubdata_builder)?;
407                    if resp.send(vm_block_result).is_err() {
408                        break;
409                    }
410                    batch_finished = true;
411                    break;
412                }
413                Command::Bootloader(resp) => {
414                    let bootloader_result = self.bootloader(&mut vm)?;
415                    if resp.send(bootloader_result).is_err() {
416                        break;
417                    }
418                    batch_finished = true;
419                    break;
420                }
421            }
422        }
423
424        drop(vm);
425        let storage_view = Rc::into_inner(storage_view)
426            .context("storage view leaked")?
427            .into_inner();
428        if !batch_finished {
429            // State keeper can exit because of stop signal, so it's OK to exit mid-batch.
430            tracing::info!("State keeper exited with an unfinished L1 batch");
431        }
432        Ok(storage_view)
433    }
434
435    fn execute_tx(
436        &self,
437        transaction: Transaction,
438        vm: &mut BatchVm<S, Tr>,
439    ) -> anyhow::Result<BatchTransactionExecutionResult> {
440        // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot
441        // was already removed), or that we build on top of it (in which case, it can be removed now).
442        vm.pop_snapshot_no_rollback();
443        // Save pre-execution VM snapshot.
444        vm.make_snapshot();
445
446        // Execute the transaction.
447        let result = if self.enforced_bytecode_compression {
448            self.execute_tx_in_vm(&transaction, vm)?
449        } else {
450            self.execute_tx_in_vm_with_optional_compression(&transaction, vm)?
451        };
452
453        Ok(result)
454    }
455
456    fn rollback_last_tx(&self, vm: &mut BatchVm<S, Tr>) {
457        vm.rollback_to_the_latest_snapshot();
458    }
459
460    fn finish_batch(
461        &self,
462        vm: &mut BatchVm<S, Tr>,
463        pubdata_builder: Rc<dyn PubdataBuilder>,
464    ) -> anyhow::Result<FinishedL1Batch> {
465        // The vm execution was paused right after the last transaction was executed.
466        // There is some post-processing work that the VM needs to do before the block is fully processed.
467        let result = vm.finish_batch(pubdata_builder);
468        anyhow::ensure!(
469            !result.block_tip_execution_result.result.is_failed(),
470            "VM must not fail when finalizing block: {:#?}",
471            result.block_tip_execution_result.result
472        );
473
474        Ok(result)
475    }
476
477    /// Attempts to execute transaction with or without bytecode compression.
478    /// If compression fails, the transaction will be re-executed without compression.
479    fn execute_tx_in_vm_with_optional_compression(
480        &self,
481        tx: &Transaction,
482        vm: &mut BatchVm<S, Tr>,
483    ) -> anyhow::Result<BatchTransactionExecutionResult> {
484        // Note, that the space where we can put the calldata for compressing transactions
485        // is limited and the transactions do not pay for taking it.
486        // In order to not let the accounts spam the space of compressed bytecodes with bytecodes
487        // that will not be published (e.g. due to out of gas), we use the following scheme:
488        // We try to execute the transaction with compressed bytecodes.
489        // If it fails and the compressed bytecodes have not been published,
490        // it means that there is no sense in polluting the space of compressed bytecodes,
491        // and so we re-execute the transaction, but without compression.
492
493        let res = vm.inspect_transaction(
494            tx.clone(),
495            true,
496            self.legacy_bootloader_debug_result.clone(),
497        );
498        if res.compression_result.is_ok() {
499            return Ok(BatchTransactionExecutionResult {
500                tx_result: res.tx_result,
501                compression_result: Ok(()),
502                call_traces: res.call_traces,
503            });
504        }
505
506        // Roll back to the snapshot just before the transaction execution taken in `Self::execute_tx()`
507        // and create a snapshot at the same VM state again.
508        vm.rollback_to_the_latest_snapshot();
509        vm.make_snapshot();
510
511        let res = vm.inspect_transaction(
512            tx.clone(),
513            false,
514            self.legacy_bootloader_debug_result.clone(),
515        );
516        res.compression_result
517            .context("compression failed when it wasn't applied")?;
518        Ok(BatchTransactionExecutionResult {
519            tx_result: res.tx_result,
520            compression_result: Ok(()),
521            call_traces: res.call_traces,
522        })
523    }
524
525    /// Attempts to execute transaction with mandatory bytecode compression.
526    /// If bytecode compression fails, the transaction will be rejected.
527    fn execute_tx_in_vm(
528        &self,
529        tx: &Transaction,
530        vm: &mut BatchVm<S, Tr>,
531    ) -> anyhow::Result<BatchTransactionExecutionResult> {
532        let res = vm.inspect_transaction(
533            tx.clone(),
534            true,
535            self.legacy_bootloader_debug_result.clone(),
536        );
537        if res.compression_result.is_ok() {
538            Ok(BatchTransactionExecutionResult {
539                tx_result: res.tx_result,
540                compression_result: Ok(()),
541                call_traces: res.call_traces,
542            })
543        } else {
544            // Transaction failed to publish bytecodes, we reject it so initiator doesn't pay fee.
545            let mut tx_result = res.tx_result;
546            tx_result.result = ExecutionResult::Halt {
547                reason: Halt::FailedToPublishCompressedBytecodes,
548            };
549            Ok(BatchTransactionExecutionResult {
550                tx_result,
551                compression_result: Ok(()),
552                call_traces: vec![],
553            })
554        }
555    }
556
557    fn bootloader(&self, vm: &mut BatchVm<S, Tr>) -> anyhow::Result<VmExecutionResultAndLogs> {
558        let result = vm.bootloader();
559        anyhow::ensure!(
560            !result.result.is_failed(),
561            "VM must not fail when running bootloader: {:#?}",
562            result.result
563        );
564
565        Ok(result)
566    }
567}