anvil_zksync_core/node/batch/
executor.rs1use std::{error::Error as StdError, sync::Arc};
2
3use anyhow::Context as _;
4use async_trait::async_trait;
5use tokio::{
6 sync::{mpsc, oneshot},
7 task::JoinHandle,
8};
9use zksync_multivm::interface::{
10 executor::BatchExecutor,
11 storage::{ReadStorage, StorageView},
12 BatchTransactionExecutionResult, FinishedL1Batch, L2BlockEnv, VmExecutionResultAndLogs,
13};
14use zksync_types::Transaction;
15
16#[derive(Debug)]
17enum HandleOrError<S> {
18 Handle(JoinHandle<anyhow::Result<StorageView<S>>>),
19 Err(Arc<dyn StdError + Send + Sync>),
20}
21
22impl<S> HandleOrError<S> {
23 async fn wait_for_error(&mut self) -> anyhow::Error {
24 let err_arc = match self {
25 Self::Handle(handle) => {
26 let err = match handle.await {
27 Ok(Ok(_)) => anyhow::anyhow!("batch executor unexpectedly stopped"),
28 Ok(Err(err)) => err,
29 Err(err) => anyhow::Error::new(err).context("batch executor panicked"),
30 };
31 let err: Box<dyn StdError + Send + Sync> = err.into();
32 let err: Arc<dyn StdError + Send + Sync> = err.into();
33 *self = Self::Err(err.clone());
34 err
35 }
36 Self::Err(err) => err.clone(),
37 };
38 anyhow::Error::new(err_arc)
39 }
40
41 async fn wait(self) -> anyhow::Result<StorageView<S>> {
42 match self {
43 Self::Handle(handle) => handle.await.context("batch executor panicked")?,
44 Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)),
45 }
46 }
47}
48
49#[derive(Debug)]
51pub struct MainBatchExecutor<S> {
52 handle: HandleOrError<S>,
53 commands: mpsc::Sender<Command>,
54}
55
56impl<S: ReadStorage> MainBatchExecutor<S> {
57 pub(super) fn new(
58 handle: JoinHandle<anyhow::Result<StorageView<S>>>,
59 commands: mpsc::Sender<Command>,
60 ) -> Self {
61 Self {
62 handle: HandleOrError::Handle(handle),
63 commands,
64 }
65 }
66
67 pub(crate) async fn bootloader(
72 mut self,
73 ) -> anyhow::Result<(VmExecutionResultAndLogs, StorageView<S>)> {
74 let (response_sender, response_receiver) = oneshot::channel();
75 let send_failed = self
76 .commands
77 .send(Command::Bootloader(response_sender))
78 .await
79 .is_err();
80 if send_failed {
81 return Err(self.handle.wait_for_error().await);
82 }
83
84 let bootloader_result = match response_receiver.await {
85 Ok(batch) => batch,
86 Err(_) => return Err(self.handle.wait_for_error().await),
87 };
88 let storage_view = self.handle.wait().await?;
89
90 Ok((bootloader_result, storage_view))
91 }
92}
93
94#[async_trait]
95impl<S> BatchExecutor<S> for MainBatchExecutor<S>
96where
97 S: ReadStorage + Send + 'static,
98{
99 #[tracing::instrument(skip_all)]
100 async fn execute_tx(
101 &mut self,
102 tx: Transaction,
103 ) -> anyhow::Result<BatchTransactionExecutionResult> {
104 let (response_sender, response_receiver) = oneshot::channel();
105 let send_failed = self
106 .commands
107 .send(Command::ExecuteTx(Box::new(tx), response_sender))
108 .await
109 .is_err();
110 if send_failed {
111 return Err(self.handle.wait_for_error().await);
112 }
113
114 let res = match response_receiver.await {
115 Ok(res) => res,
116 Err(_) => return Err(self.handle.wait_for_error().await),
117 };
118
119 Ok(res)
120 }
121
122 #[tracing::instrument(skip_all)]
123 async fn rollback_last_tx(&mut self) -> anyhow::Result<()> {
124 let (response_sender, response_receiver) = oneshot::channel();
127 let send_failed = self
128 .commands
129 .send(Command::RollbackLastTx(response_sender))
130 .await
131 .is_err();
132 if send_failed {
133 return Err(self.handle.wait_for_error().await);
134 }
135
136 if response_receiver.await.is_err() {
137 return Err(self.handle.wait_for_error().await);
138 }
139 Ok(())
140 }
141
142 #[tracing::instrument(skip_all)]
143 async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> {
144 let (response_sender, response_receiver) = oneshot::channel();
147 let send_failed = self
148 .commands
149 .send(Command::StartNextL2Block(env, response_sender))
150 .await
151 .is_err();
152 if send_failed {
153 return Err(self.handle.wait_for_error().await);
154 }
155
156 if response_receiver.await.is_err() {
157 return Err(self.handle.wait_for_error().await);
158 }
159 Ok(())
160 }
161
162 #[tracing::instrument(skip_all)]
163 async fn finish_batch(
164 mut self: Box<Self>,
165 ) -> anyhow::Result<(FinishedL1Batch, StorageView<S>)> {
166 let (response_sender, response_receiver) = oneshot::channel();
167 let send_failed = self
168 .commands
169 .send(Command::FinishBatch(response_sender))
170 .await
171 .is_err();
172 if send_failed {
173 return Err(self.handle.wait_for_error().await);
174 }
175
176 let finished_batch = match response_receiver.await {
177 Ok(batch) => batch,
178 Err(_) => return Err(self.handle.wait_for_error().await),
179 };
180 let storage_view = self.handle.wait().await?;
181 Ok((finished_batch, storage_view))
182 }
183}
184
185#[derive(Debug)]
186pub(super) enum Command {
187 ExecuteTx(
188 Box<Transaction>,
189 oneshot::Sender<BatchTransactionExecutionResult>,
190 ),
191 StartNextL2Block(L2BlockEnv, oneshot::Sender<()>),
192 RollbackLastTx(oneshot::Sender<()>),
193 FinishBatch(oneshot::Sender<FinishedL1Batch>),
194 Bootloader(oneshot::Sender<VmExecutionResultAndLogs>),
195}