anvil_zksync_core/node/batch/
executor.rs

1use std::{error::Error as StdError, sync::Arc};
2
3use anyhow::Context as _;
4use async_trait::async_trait;
5use tokio::{
6    sync::{mpsc, oneshot},
7    task::JoinHandle,
8};
9use zksync_multivm::interface::{
10    executor::BatchExecutor,
11    storage::{ReadStorage, StorageView},
12    BatchTransactionExecutionResult, FinishedL1Batch, L2BlockEnv, VmExecutionResultAndLogs,
13};
14use zksync_types::Transaction;
15
16#[derive(Debug)]
17enum HandleOrError<S> {
18    Handle(JoinHandle<anyhow::Result<StorageView<S>>>),
19    Err(Arc<dyn StdError + Send + Sync>),
20}
21
22impl<S> HandleOrError<S> {
23    async fn wait_for_error(&mut self) -> anyhow::Error {
24        let err_arc = match self {
25            Self::Handle(handle) => {
26                let err = match handle.await {
27                    Ok(Ok(_)) => anyhow::anyhow!("batch executor unexpectedly stopped"),
28                    Ok(Err(err)) => err,
29                    Err(err) => anyhow::Error::new(err).context("batch executor panicked"),
30                };
31                let err: Box<dyn StdError + Send + Sync> = err.into();
32                let err: Arc<dyn StdError + Send + Sync> = err.into();
33                *self = Self::Err(err.clone());
34                err
35            }
36            Self::Err(err) => err.clone(),
37        };
38        anyhow::Error::new(err_arc)
39    }
40
41    async fn wait(self) -> anyhow::Result<StorageView<S>> {
42        match self {
43            Self::Handle(handle) => handle.await.context("batch executor panicked")?,
44            Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)),
45        }
46    }
47}
48
49/// "Main" [`BatchExecutor`] implementation instantiating a VM in a blocking Tokio thread.
50#[derive(Debug)]
51pub struct MainBatchExecutor<S> {
52    handle: HandleOrError<S>,
53    commands: mpsc::Sender<Command>,
54}
55
56impl<S: ReadStorage> MainBatchExecutor<S> {
57    pub(super) fn new(
58        handle: JoinHandle<anyhow::Result<StorageView<S>>>,
59        commands: mpsc::Sender<Command>,
60    ) -> Self {
61        Self {
62            handle: HandleOrError::Handle(handle),
63            commands,
64        }
65    }
66
67    /// Custom method (not present in zksync-era) that runs bootloader once thus applying the bare
68    /// minimum of changes to the state on batch sealing. Not as time-consuming as [`Self::finish_batch`].
69    ///
70    /// To be deleted once we stop sealing batches on every block.
71    pub(crate) async fn bootloader(
72        mut self,
73    ) -> anyhow::Result<(VmExecutionResultAndLogs, StorageView<S>)> {
74        let (response_sender, response_receiver) = oneshot::channel();
75        let send_failed = self
76            .commands
77            .send(Command::Bootloader(response_sender))
78            .await
79            .is_err();
80        if send_failed {
81            return Err(self.handle.wait_for_error().await);
82        }
83
84        let bootloader_result = match response_receiver.await {
85            Ok(batch) => batch,
86            Err(_) => return Err(self.handle.wait_for_error().await),
87        };
88        let storage_view = self.handle.wait().await?;
89
90        Ok((bootloader_result, storage_view))
91    }
92}
93
94#[async_trait]
95impl<S> BatchExecutor<S> for MainBatchExecutor<S>
96where
97    S: ReadStorage + Send + 'static,
98{
99    #[tracing::instrument(skip_all)]
100    async fn execute_tx(
101        &mut self,
102        tx: Transaction,
103    ) -> anyhow::Result<BatchTransactionExecutionResult> {
104        let (response_sender, response_receiver) = oneshot::channel();
105        let send_failed = self
106            .commands
107            .send(Command::ExecuteTx(Box::new(tx), response_sender))
108            .await
109            .is_err();
110        if send_failed {
111            return Err(self.handle.wait_for_error().await);
112        }
113
114        let res = match response_receiver.await {
115            Ok(res) => res,
116            Err(_) => return Err(self.handle.wait_for_error().await),
117        };
118
119        Ok(res)
120    }
121
122    #[tracing::instrument(skip_all)]
123    async fn rollback_last_tx(&mut self) -> anyhow::Result<()> {
124        // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
125        // indeed has been processed.
126        let (response_sender, response_receiver) = oneshot::channel();
127        let send_failed = self
128            .commands
129            .send(Command::RollbackLastTx(response_sender))
130            .await
131            .is_err();
132        if send_failed {
133            return Err(self.handle.wait_for_error().await);
134        }
135
136        if response_receiver.await.is_err() {
137            return Err(self.handle.wait_for_error().await);
138        }
139        Ok(())
140    }
141
142    #[tracing::instrument(skip_all)]
143    async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> {
144        // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
145        // indeed has been processed.
146        let (response_sender, response_receiver) = oneshot::channel();
147        let send_failed = self
148            .commands
149            .send(Command::StartNextL2Block(env, response_sender))
150            .await
151            .is_err();
152        if send_failed {
153            return Err(self.handle.wait_for_error().await);
154        }
155
156        if response_receiver.await.is_err() {
157            return Err(self.handle.wait_for_error().await);
158        }
159        Ok(())
160    }
161
162    #[tracing::instrument(skip_all)]
163    async fn finish_batch(
164        mut self: Box<Self>,
165    ) -> anyhow::Result<(FinishedL1Batch, StorageView<S>)> {
166        let (response_sender, response_receiver) = oneshot::channel();
167        let send_failed = self
168            .commands
169            .send(Command::FinishBatch(response_sender))
170            .await
171            .is_err();
172        if send_failed {
173            return Err(self.handle.wait_for_error().await);
174        }
175
176        let finished_batch = match response_receiver.await {
177            Ok(batch) => batch,
178            Err(_) => return Err(self.handle.wait_for_error().await),
179        };
180        let storage_view = self.handle.wait().await?;
181        Ok((finished_batch, storage_view))
182    }
183}
184
185#[derive(Debug)]
186pub(super) enum Command {
187    ExecuteTx(
188        Box<Transaction>,
189        oneshot::Sender<BatchTransactionExecutionResult>,
190    ),
191    StartNextL2Block(L2BlockEnv, oneshot::Sender<()>),
192    RollbackLastTx(oneshot::Sender<()>),
193    FinishBatch(oneshot::Sender<FinishedL1Batch>),
194    Bootloader(oneshot::Sender<VmExecutionResultAndLogs>),
195}