1use crate::contracts;
2use crate::zkstack_config::ZkstackConfig;
3use alloy::consensus::{SidecarBuilder, SimpleCoder};
4use alloy::network::{ReceiptResponse, TransactionBuilder, TransactionBuilder4844};
5use alloy::providers::Provider;
6use alloy::rpc::types::TransactionRequest;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot};
9use zksync_mini_merkle_tree::MiniMerkleTree;
10use zksync_types::commitment::L1BatchWithMetadata;
11use zksync_types::hasher::keccak::KeccakHasher;
12use zksync_types::l1::L1Tx;
13use zksync_types::{Address, L2ChainId, H256};
14
15pub struct L1Sender {
17 provider: Arc<dyn Provider + 'static>,
18 l2_chain_id: L2ChainId,
19 validator_timelock_addr: Address,
20 command_receiver: mpsc::Receiver<Command>,
21 last_committed_l1_batch: L1BatchWithMetadata,
22 last_proved_l1_batch: L1BatchWithMetadata,
23 l1_tx_merkle_tree: MiniMerkleTree<L1Tx>,
25}
26
27impl L1Sender {
28 pub fn new(
34 zkstack_config: &ZkstackConfig,
35 genesis_metadata: L1BatchWithMetadata,
36 provider: Arc<dyn Provider + 'static>,
37 ) -> (Self, L1SenderHandle) {
38 let (command_sender, command_receiver) = mpsc::channel(128);
39 let this = Self {
40 provider,
41 l2_chain_id: zkstack_config.genesis.l2_chain_id,
42 validator_timelock_addr: zkstack_config.contracts.l1.validator_timelock_addr,
43 command_receiver,
44 last_committed_l1_batch: genesis_metadata.clone(),
45 last_proved_l1_batch: genesis_metadata,
46 l1_tx_merkle_tree: MiniMerkleTree::<L1Tx>::from_hashes(
47 KeccakHasher,
48 std::iter::empty(),
49 None,
50 ),
51 };
52 let handle = L1SenderHandle { command_sender };
53 (this, handle)
54 }
55
56 pub async fn run(mut self) -> anyhow::Result<()> {
59 while let Some(command) = self.command_receiver.recv().await {
60 match command {
61 Command::Commit(batch, reply) => self.commit(batch, reply).await,
62 Command::Prove(batch, reply) => self.prove(batch, reply).await,
63 Command::Execute(batch, reply) => self.execute(batch, reply).await,
64 }
65 }
66
67 tracing::trace!("channel has been closed; stopping L1 sender");
68 Ok(())
69 }
70}
71
72impl L1Sender {
73 async fn commit(
74 &mut self,
75 batch: L1BatchWithMetadata,
76 reply: oneshot::Sender<anyhow::Result<H256>>,
77 ) {
78 let result = self.commit_fallible(&batch).await;
79 if result.is_ok() {
80 self.last_committed_l1_batch = batch;
82 }
83
84 let result = if let Err(result) = reply.send(result) {
86 tracing::info!("failed to reply as receiver has been dropped");
87 result
88 } else {
89 return;
90 };
91 if let Err(err) = result {
93 tracing::error!("failed to commit batch: {:#?}", err);
94 }
95 }
96
97 async fn commit_fallible(&self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
98 let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
100
101 let call = contracts::commit_batches_shared_bridge_call(
102 self.l2_chain_id,
103 &self.last_committed_l1_batch,
104 batch,
105 );
106
107 let gas_price = self.provider.get_gas_price().await?;
108 let eip1559_est = self.provider.estimate_eip1559_fees().await?;
109 let tx = TransactionRequest::default()
110 .with_to(self.validator_timelock_addr.0.into())
111 .with_max_fee_per_blob_gas(gas_price)
112 .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
113 .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
114 .with_gas_limit(15000000)
116 .with_call(&call)
117 .with_blob_sidecar(sidecar);
118
119 let pending_tx = self.provider.send_transaction(tx).await?;
120 tracing::debug!(
121 batch = batch.header.number.0,
122 pending_tx_hash = ?pending_tx.tx_hash(),
123 "batch commit transaction sent to L1"
124 );
125
126 let receipt = pending_tx.get_receipt().await?;
127 if receipt.status() {
128 tracing::info!(
131 batch = batch.header.number.0,
132 tx_hash = ?receipt.transaction_hash,
133 block_number = receipt.block_number.unwrap(),
134 "batch committed to L1",
135 );
136 } else {
137 tracing::error!(
138 batch = batch.header.number.0,
139 tx_hash = ?receipt.transaction_hash,
140 block_number = receipt.block_number.unwrap(),
141 "commit transaction failed"
142 );
143 anyhow::bail!(
144 "commit transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
145 receipt.transaction_hash
146 );
147 }
148
149 Ok(receipt.transaction_hash().0.into())
150 }
151
152 async fn prove(
153 &mut self,
154 batch: L1BatchWithMetadata,
155 reply: oneshot::Sender<anyhow::Result<H256>>,
156 ) {
157 let result = self.prove_fallible(&batch).await;
158 if result.is_ok() {
159 self.last_proved_l1_batch = batch;
161 }
162
163 let result = if let Err(result) = reply.send(result) {
165 tracing::info!("failed to reply as receiver has been dropped");
166 result
167 } else {
168 return;
169 };
170 if let Err(err) = result {
172 tracing::error!("failed to prove batch: {:#?}", err);
173 }
174 }
175
176 async fn prove_fallible(&self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
177 let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
179
180 let call = contracts::prove_batches_shared_bridge_call(
181 self.l2_chain_id,
182 &self.last_proved_l1_batch,
183 batch,
184 );
185
186 let gas_price = self.provider.get_gas_price().await?;
187 let eip1559_est = self.provider.estimate_eip1559_fees().await?;
188 let tx = TransactionRequest::default()
189 .with_to(self.validator_timelock_addr.0.into())
190 .with_max_fee_per_blob_gas(gas_price)
191 .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
192 .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
193 .with_gas_limit(15000000)
195 .with_call(&call)
196 .with_blob_sidecar(sidecar);
197
198 let pending_tx = self.provider.send_transaction(tx).await?;
199 tracing::debug!(
200 batch = batch.header.number.0,
201 pending_tx_hash = ?pending_tx.tx_hash(),
202 "batch prove transaction sent to L1"
203 );
204
205 let receipt = pending_tx.get_receipt().await?;
206 if receipt.status() {
207 tracing::info!(
210 batch = batch.header.number.0,
211 tx_hash = ?receipt.transaction_hash,
212 block_number = receipt.block_number.unwrap(),
213 "batch proved on L1",
214 );
215 } else {
216 tracing::error!(
217 batch = batch.header.number.0,
218 tx_hash = ?receipt.transaction_hash,
219 block_number = receipt.block_number.unwrap(),
220 "prove transaction failed"
221 );
222 anyhow::bail!(
223 "prove transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
224 receipt.transaction_hash
225 );
226 }
227
228 Ok(receipt.transaction_hash().0.into())
229 }
230
231 async fn execute(
232 &mut self,
233 batch: L1BatchWithMetadata,
234 reply: oneshot::Sender<anyhow::Result<H256>>,
235 ) {
236 let result = self.execute_fallible(&batch).await;
237
238 let result = if let Err(result) = reply.send(result) {
240 tracing::info!("failed to reply as receiver has been dropped");
241 result
242 } else {
243 return;
244 };
245 if let Err(err) = result {
247 tracing::error!("failed to execute batch: {:#?}", err);
248 }
249 }
250
251 async fn execute_fallible(&mut self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
252 let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
254
255 for priority_op in &batch.header.priority_ops_onchain_data {
257 self.l1_tx_merkle_tree
258 .push_hash(priority_op.onchain_data_hash);
259 }
260 let call = contracts::execute_batches_shared_bridge_call(
262 self.l2_chain_id,
263 batch,
264 &self.l1_tx_merkle_tree,
265 );
266
267 let gas_price = self.provider.get_gas_price().await?;
268 let eip1559_est = self.provider.estimate_eip1559_fees().await?;
269 let tx = TransactionRequest::default()
270 .with_to(self.validator_timelock_addr.0.into())
271 .with_max_fee_per_blob_gas(gas_price)
272 .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
273 .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
274 .with_gas_limit(15000000)
276 .with_call(&call)
277 .with_blob_sidecar(sidecar);
278
279 let pending_tx = self.provider.send_transaction(tx).await?;
280 tracing::debug!(
281 batch = batch.header.number.0,
282 pending_tx_hash = ?pending_tx.tx_hash(),
283 "batch execute transaction sent to L1"
284 );
285
286 let receipt = pending_tx.get_receipt().await?;
287 if receipt.status() {
288 tracing::info!(
291 batch = batch.header.number.0,
292 tx_hash = ?receipt.transaction_hash,
293 block_number = receipt.block_number.unwrap(),
294 "batch executed on L1",
295 );
296 } else {
297 tracing::error!(
298 batch = batch.header.number.0,
299 tx_hash = ?receipt.transaction_hash,
300 block_number = receipt.block_number.unwrap(),
301 "execute transaction failed"
302 );
303 anyhow::bail!(
304 "execute transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
305 receipt.transaction_hash
306 );
307 }
308
309 Ok(receipt.transaction_hash().0.into())
310 }
311}
312
313#[derive(Clone, Debug)]
316pub struct L1SenderHandle {
317 command_sender: mpsc::Sender<Command>,
318}
319
320impl L1SenderHandle {
321 pub async fn commit_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
324 let (response_sender, response_receiver) = oneshot::channel();
325 self.command_sender
326 .send(Command::Commit(batch, response_sender))
327 .await
328 .map_err(|_| anyhow::anyhow!("failed to commit a batch as L1 sender is dropped"))?;
329
330 match response_receiver.await {
331 Ok(result) => result,
332 Err(_) => anyhow::bail!("failed to commit a batch as L1 sender is dropped"),
333 }
334 }
335
336 pub async fn prove_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
339 let (response_sender, response_receiver) = oneshot::channel();
340 self.command_sender
341 .send(Command::Prove(batch, response_sender))
342 .await
343 .map_err(|_| anyhow::anyhow!("failed to prove a batch as L1 sender is dropped"))?;
344
345 match response_receiver.await {
346 Ok(result) => result,
347 Err(_) => anyhow::bail!("failed to prove a batch as L1 sender is dropped"),
348 }
349 }
350
351 pub async fn execute_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
354 let (response_sender, response_receiver) = oneshot::channel();
355 self.command_sender
356 .send(Command::Execute(batch, response_sender))
357 .await
358 .map_err(|_| anyhow::anyhow!("failed to execute a batch as L1 sender is dropped"))?;
359
360 match response_receiver.await {
361 Ok(result) => result,
362 Err(_) => anyhow::bail!("failed to execute a batch as L1 sender is dropped"),
363 }
364 }
365}
366
367#[derive(Debug)]
368enum Command {
369 Commit(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
370 Prove(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
371 Execute(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
372}