use super::executor::{Command, MainBatchExecutor};
use super::shared::Sealed;
use crate::bootloader_debug::{BootloaderDebug, BootloaderDebugTracer};
use crate::node::call_error_tracer::CallErrorTracer;
use anyhow::Context as _;
use once_cell::sync::OnceCell;
use std::sync::RwLock;
use std::{fmt, marker::PhantomData, rc::Rc, sync::Arc};
use tokio::sync::mpsc;
use zksync_multivm::interface::{InspectExecutionMode, VmExecutionResultAndLogs};
use zksync_multivm::{
interface::{
executor::{BatchExecutor, BatchExecutorFactory},
pubdata::PubdataBuilder,
storage::{ReadStorage, StoragePtr, StorageView},
utils::{DivergenceHandler, ShadowMut},
BatchTransactionExecutionResult, Call, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv,
L2BlockEnv, SystemEnv, VmFactory, VmInterface, VmInterfaceHistoryEnabled,
},
is_supported_by_fast_vm,
pubdata_builders::pubdata_params_to_builder,
tracers::CallTracer,
vm_fast,
vm_fast::FastValidationTracer,
vm_latest::HistoryEnabled,
FastVmInstance, LegacyVmInstance, MultiVmTracer,
};
use zksync_types::{commitment::PubdataParams, vm::FastVmMode, Transaction};
#[doc(hidden)]
pub trait CallTracingTracer: vm_fast::interface::Tracer + Default {
fn into_traces(self) -> Vec<Call>;
}
impl CallTracingTracer for () {
fn into_traces(self) -> Vec<Call> {
vec![]
}
}
impl CallTracingTracer for vm_fast::CallTracer {
fn into_traces(self) -> Vec<Call> {
self.into_result()
}
}
pub trait BatchTracer: fmt::Debug + 'static + Send + Sealed {
#[doc(hidden)]
const TRACE_CALLS: bool;
#[doc(hidden)]
type Fast: CallTracingTracer;
}
impl Sealed for () {}
impl BatchTracer for () {
const TRACE_CALLS: bool = false;
type Fast = ();
}
#[derive(Debug)]
pub struct TraceCalls(());
impl Sealed for TraceCalls {}
impl BatchTracer for TraceCalls {
const TRACE_CALLS: bool = true;
type Fast = vm_fast::CallTracer;
}
#[derive(Debug, Clone)]
pub struct MainBatchExecutorFactory<Tr> {
enforced_bytecode_compression: bool,
fast_vm_mode: FastVmMode,
skip_signature_verification: bool,
divergence_handler: Option<DivergenceHandler>,
legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
_tracer: PhantomData<Tr>,
}
impl<Tr: BatchTracer> MainBatchExecutorFactory<Tr> {
pub fn new(
enforced_bytecode_compression: bool,
legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
) -> Self {
Self {
enforced_bytecode_compression,
fast_vm_mode: FastVmMode::Old,
skip_signature_verification: false,
divergence_handler: None,
legacy_bootloader_debug_result,
_tracer: PhantomData,
}
}
pub(crate) fn init_main_batch<S: ReadStorage + Send + 'static>(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
pubdata_params: PubdataParams,
) -> MainBatchExecutor<S> {
let (commands_sender, commands_receiver) = mpsc::channel(1);
let executor = CommandReceiver {
enforced_bytecode_compression: self.enforced_bytecode_compression,
fast_vm_mode: self.fast_vm_mode,
skip_signature_verification: self.skip_signature_verification,
divergence_handler: self.divergence_handler.clone(),
commands: commands_receiver,
legacy_bootloader_debug_result: self.legacy_bootloader_debug_result.clone(),
_storage: PhantomData,
_tracer: PhantomData::<Tr>,
};
let handle = tokio::task::spawn_blocking(move || {
executor.run(
storage,
l1_batch_params,
system_env,
pubdata_params_to_builder(pubdata_params),
)
});
MainBatchExecutor::new(handle, commands_sender)
}
}
impl<S: ReadStorage + Send + 'static, Tr: BatchTracer> BatchExecutorFactory<S>
for MainBatchExecutorFactory<Tr>
{
fn init_batch(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
pubdata_params: PubdataParams,
) -> Box<dyn BatchExecutor<S>> {
Box::new(self.init_main_batch(storage, l1_batch_params, system_env, pubdata_params))
}
}
#[derive(Debug)]
enum BatchVm<S: ReadStorage, Tr: BatchTracer> {
Legacy(LegacyVmInstance<S, HistoryEnabled>),
Fast(FastVmInstance<S, Tr::Fast>),
}
macro_rules! dispatch_batch_vm {
($self:ident.$function:ident($($params:tt)*)) => {
match $self {
Self::Legacy(vm) => vm.$function($($params)*),
Self::Fast(vm) => vm.$function($($params)*),
}
};
}
impl<S: ReadStorage, Tr: BatchTracer> BatchVm<S, Tr> {
fn new(
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
storage_ptr: StoragePtr<StorageView<S>>,
mode: FastVmMode,
) -> Self {
if !is_supported_by_fast_vm(system_env.version) {
return Self::Legacy(LegacyVmInstance::new(l1_batch_env, system_env, storage_ptr));
}
match mode {
FastVmMode::Old => {
Self::Legacy(LegacyVmInstance::new(l1_batch_env, system_env, storage_ptr))
}
FastVmMode::New => {
Self::Fast(FastVmInstance::fast(l1_batch_env, system_env, storage_ptr))
}
FastVmMode::Shadow => Self::Fast(FastVmInstance::shadowed(
l1_batch_env,
system_env,
storage_ptr,
)),
}
}
fn start_new_l2_block(&mut self, l2_block: L2BlockEnv) {
dispatch_batch_vm!(self.start_new_l2_block(l2_block));
}
fn finish_batch(&mut self, pubdata_builder: Rc<dyn PubdataBuilder>) -> FinishedL1Batch {
dispatch_batch_vm!(self.finish_batch(pubdata_builder))
}
fn bootloader(&mut self) -> VmExecutionResultAndLogs {
dispatch_batch_vm!(self.inspect(&mut Default::default(), InspectExecutionMode::Bootloader))
}
fn make_snapshot(&mut self) {
dispatch_batch_vm!(self.make_snapshot());
}
fn rollback_to_the_latest_snapshot(&mut self) {
dispatch_batch_vm!(self.rollback_to_the_latest_snapshot());
}
fn pop_snapshot_no_rollback(&mut self) {
dispatch_batch_vm!(self.pop_snapshot_no_rollback());
}
fn inspect_transaction(
&mut self,
tx: Transaction,
with_compression: bool,
legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
) -> BatchTransactionExecutionResult {
let legacy_tracer_result = Arc::new(OnceCell::default());
let legacy_error_flags_result = Arc::new(OnceCell::new());
let mut legacy_tracer = if Tr::TRACE_CALLS {
vec![CallTracer::new(legacy_tracer_result.clone()).into_tracer_pointer()]
} else {
vec![]
};
legacy_tracer
.push(BootloaderDebugTracer::new(legacy_bootloader_debug_result).into_tracer_pointer());
legacy_tracer
.push(CallErrorTracer::new(legacy_error_flags_result.clone()).into_tracer_pointer());
let mut legacy_tracer = legacy_tracer.into();
let mut fast_traces = vec![];
let (compression_result, tx_result) = match self {
Self::Legacy(vm) => vm.inspect_transaction_with_bytecode_compression(
&mut legacy_tracer,
tx,
with_compression,
),
Self::Fast(vm) => {
let mut tracer = (
legacy_tracer.into(),
(Tr::Fast::default(), FastValidationTracer::default()),
);
let res = vm.inspect_transaction_with_bytecode_compression(
&mut tracer,
tx,
with_compression,
);
let (_, (call_tracer, _)) = tracer;
fast_traces = call_tracer.into_traces();
res
}
};
let compressed_bytecodes = compression_result.map(drop);
let legacy_traces = Arc::try_unwrap(legacy_tracer_result)
.expect("failed extracting call traces")
.take()
.unwrap_or_default();
let call_traces = match self {
Self::Legacy(_) => legacy_traces,
Self::Fast(FastVmInstance::Fast(_)) => fast_traces,
Self::Fast(FastVmInstance::Shadowed(vm)) => {
vm.get_custom_mut("call_traces", |r| match r {
ShadowMut::Main(_) => legacy_traces.as_slice(),
ShadowMut::Shadow(_) => fast_traces.as_slice(),
});
fast_traces
}
};
BatchTransactionExecutionResult {
tx_result: Box::new(tx_result),
compression_result: compressed_bytecodes,
call_traces,
}
}
}
#[derive(Debug)]
struct CommandReceiver<S, Tr> {
enforced_bytecode_compression: bool,
fast_vm_mode: FastVmMode,
skip_signature_verification: bool,
divergence_handler: Option<DivergenceHandler>,
commands: mpsc::Receiver<Command>,
legacy_bootloader_debug_result: Arc<RwLock<eyre::Result<BootloaderDebug, String>>>,
_storage: PhantomData<S>,
_tracer: PhantomData<Tr>,
}
impl<S: ReadStorage + 'static, Tr: BatchTracer> CommandReceiver<S, Tr> {
pub(super) fn run(
mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
pubdata_builder: Rc<dyn PubdataBuilder>,
) -> anyhow::Result<StorageView<S>> {
tracing::info!("Starting executing L1 batch #{}", &l1_batch_params.number);
let storage_view = StorageView::new(storage).to_rc_ptr();
let mut vm = BatchVm::<S, Tr>::new(
l1_batch_params,
system_env,
storage_view.clone(),
self.fast_vm_mode,
);
if self.skip_signature_verification {
if let BatchVm::Fast(vm) = &mut vm {
vm.skip_signature_verification();
}
}
let mut batch_finished = false;
if let BatchVm::Fast(FastVmInstance::Shadowed(shadowed)) = &mut vm {
if let Some(handler) = self.divergence_handler.take() {
shadowed.set_divergence_handler(handler);
}
}
while let Some(cmd) = self.commands.blocking_recv() {
match cmd {
Command::ExecuteTx(tx, resp) => {
let tx_hash = tx.hash();
let result = self.execute_tx(*tx, &mut vm).with_context(|| {
format!("fatal error executing transaction {tx_hash:?}")
})?;
if resp.send(result).is_err() {
break;
}
}
Command::RollbackLastTx(resp) => {
self.rollback_last_tx(&mut vm);
if resp.send(()).is_err() {
break;
}
}
Command::StartNextL2Block(l2_block_env, resp) => {
vm.start_new_l2_block(l2_block_env);
if resp.send(()).is_err() {
break;
}
}
Command::FinishBatch(resp) => {
let vm_block_result = self.finish_batch(&mut vm, pubdata_builder)?;
if resp.send(vm_block_result).is_err() {
break;
}
batch_finished = true;
break;
}
Command::Bootloader(resp) => {
let bootloader_result = self.bootloader(&mut vm)?;
if resp.send(bootloader_result).is_err() {
break;
}
batch_finished = true;
break;
}
}
}
drop(vm);
let storage_view = Rc::into_inner(storage_view)
.context("storage view leaked")?
.into_inner();
if !batch_finished {
tracing::info!("State keeper exited with an unfinished L1 batch");
}
Ok(storage_view)
}
fn execute_tx(
&self,
transaction: Transaction,
vm: &mut BatchVm<S, Tr>,
) -> anyhow::Result<BatchTransactionExecutionResult> {
vm.pop_snapshot_no_rollback();
vm.make_snapshot();
let result = if self.enforced_bytecode_compression {
self.execute_tx_in_vm(&transaction, vm)?
} else {
self.execute_tx_in_vm_with_optional_compression(&transaction, vm)?
};
Ok(result)
}
fn rollback_last_tx(&self, vm: &mut BatchVm<S, Tr>) {
vm.rollback_to_the_latest_snapshot();
}
fn finish_batch(
&self,
vm: &mut BatchVm<S, Tr>,
pubdata_builder: Rc<dyn PubdataBuilder>,
) -> anyhow::Result<FinishedL1Batch> {
let result = vm.finish_batch(pubdata_builder);
anyhow::ensure!(
!result.block_tip_execution_result.result.is_failed(),
"VM must not fail when finalizing block: {:#?}",
result.block_tip_execution_result.result
);
Ok(result)
}
fn execute_tx_in_vm_with_optional_compression(
&self,
tx: &Transaction,
vm: &mut BatchVm<S, Tr>,
) -> anyhow::Result<BatchTransactionExecutionResult> {
let res = vm.inspect_transaction(
tx.clone(),
true,
self.legacy_bootloader_debug_result.clone(),
);
if res.compression_result.is_ok() {
return Ok(BatchTransactionExecutionResult {
tx_result: res.tx_result,
compression_result: Ok(()),
call_traces: res.call_traces,
});
}
vm.rollback_to_the_latest_snapshot();
vm.make_snapshot();
let res = vm.inspect_transaction(
tx.clone(),
false,
self.legacy_bootloader_debug_result.clone(),
);
res.compression_result
.context("compression failed when it wasn't applied")?;
Ok(BatchTransactionExecutionResult {
tx_result: res.tx_result,
compression_result: Ok(()),
call_traces: res.call_traces,
})
}
fn execute_tx_in_vm(
&self,
tx: &Transaction,
vm: &mut BatchVm<S, Tr>,
) -> anyhow::Result<BatchTransactionExecutionResult> {
let res = vm.inspect_transaction(
tx.clone(),
true,
self.legacy_bootloader_debug_result.clone(),
);
if res.compression_result.is_ok() {
Ok(BatchTransactionExecutionResult {
tx_result: res.tx_result,
compression_result: Ok(()),
call_traces: res.call_traces,
})
} else {
let mut tx_result = res.tx_result;
tx_result.result = ExecutionResult::Halt {
reason: Halt::FailedToPublishCompressedBytecodes,
};
Ok(BatchTransactionExecutionResult {
tx_result,
compression_result: Ok(()),
call_traces: vec![],
})
}
}
fn bootloader(&self, vm: &mut BatchVm<S, Tr>) -> anyhow::Result<VmExecutionResultAndLogs> {
let result = vm.bootloader();
anyhow::ensure!(
!result.result.is_failed(),
"VM must not fail when running bootloader: {:#?}",
result.result
);
Ok(result)
}
}