anvil_zksync_l1_sidecar/
l1_sender.rs

1use crate::contracts;
2use crate::zkstack_config::ZkstackConfig;
3use alloy::consensus::{SidecarBuilder, SimpleCoder};
4use alloy::network::{ReceiptResponse, TransactionBuilder, TransactionBuilder4844};
5use alloy::providers::Provider;
6use alloy::rpc::types::TransactionRequest;
7use std::sync::Arc;
8use tokio::sync::{mpsc, oneshot};
9use zksync_mini_merkle_tree::MiniMerkleTree;
10use zksync_types::commitment::L1BatchWithMetadata;
11use zksync_types::hasher::keccak::KeccakHasher;
12use zksync_types::l1::L1Tx;
13use zksync_types::{Address, L2ChainId, H256};
14
15/// Node component responsible for sending transactions to L1.
16pub struct L1Sender {
17    provider: Arc<dyn Provider + 'static>,
18    l2_chain_id: L2ChainId,
19    validator_timelock_addr: Address,
20    command_receiver: mpsc::Receiver<Command>,
21    last_committed_l1_batch: L1BatchWithMetadata,
22    last_proved_l1_batch: L1BatchWithMetadata,
23    /// Merkle tree with all priority transactions ever processed.
24    l1_tx_merkle_tree: MiniMerkleTree<L1Tx>,
25}
26
27impl L1Sender {
28    /// Initializes a new [`L1Sender`] that will send transaction using supplied provider. Assumes
29    /// that zkstack config matches L1 configuration at the other end of provider.
30    ///
31    /// Resulting [`L1Sender`] is expected to be consumed by calling [`Self::run`]. Additionally,
32    /// returns a cloneable handle that can be used to send requests to this instance of [`L1Sender`].
33    pub fn new(
34        zkstack_config: &ZkstackConfig,
35        genesis_metadata: L1BatchWithMetadata,
36        provider: Arc<dyn Provider + 'static>,
37    ) -> (Self, L1SenderHandle) {
38        let (command_sender, command_receiver) = mpsc::channel(128);
39        let this = Self {
40            provider,
41            l2_chain_id: zkstack_config.genesis.l2_chain_id,
42            validator_timelock_addr: zkstack_config.contracts.l1.validator_timelock_addr,
43            command_receiver,
44            last_committed_l1_batch: genesis_metadata.clone(),
45            last_proved_l1_batch: genesis_metadata,
46            l1_tx_merkle_tree: MiniMerkleTree::<L1Tx>::from_hashes(
47                KeccakHasher,
48                std::iter::empty(),
49                None,
50            ),
51        };
52        let handle = L1SenderHandle { command_sender };
53        (this, handle)
54    }
55
56    /// Runs L1 sender indefinitely thus processing requests received from any of the matching
57    /// handles.
58    pub async fn run(mut self) -> anyhow::Result<()> {
59        while let Some(command) = self.command_receiver.recv().await {
60            match command {
61                Command::Commit(batch, reply) => self.commit(batch, reply).await,
62                Command::Prove(batch, reply) => self.prove(batch, reply).await,
63                Command::Execute(batch, reply) => self.execute(batch, reply).await,
64            }
65        }
66
67        tracing::trace!("channel has been closed; stopping L1 sender");
68        Ok(())
69    }
70}
71
72impl L1Sender {
73    async fn commit(
74        &mut self,
75        batch: L1BatchWithMetadata,
76        reply: oneshot::Sender<anyhow::Result<H256>>,
77    ) {
78        let result = self.commit_fallible(&batch).await;
79        if result.is_ok() {
80            // Commitment was successful, update last committed batch
81            self.last_committed_l1_batch = batch;
82        }
83
84        // Reply to sender if we can, otherwise hold result for further processing
85        let result = if let Err(result) = reply.send(result) {
86            tracing::info!("failed to reply as receiver has been dropped");
87            result
88        } else {
89            return;
90        };
91        // Not much we can do with an error at this level so we just print it
92        if let Err(err) = result {
93            tracing::error!("failed to commit batch: {:#?}", err);
94        }
95    }
96
97    async fn commit_fallible(&self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
98        // Create a blob sidecar with empty data
99        let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
100
101        let call = contracts::commit_batches_shared_bridge_call(
102            self.l2_chain_id,
103            &self.last_committed_l1_batch,
104            batch,
105        );
106
107        let gas_price = self.provider.get_gas_price().await?;
108        let eip1559_est = self.provider.estimate_eip1559_fees().await?;
109        let tx = TransactionRequest::default()
110            .with_to(self.validator_timelock_addr.0.into())
111            .with_max_fee_per_blob_gas(gas_price)
112            .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
113            .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
114            // Default value for `max_aggregated_tx_gas` from zksync-era, should always be enough
115            .with_gas_limit(15000000)
116            .with_call(&call)
117            .with_blob_sidecar(sidecar);
118
119        let pending_tx = self.provider.send_transaction(tx).await?;
120        tracing::debug!(
121            batch = batch.header.number.0,
122            pending_tx_hash = ?pending_tx.tx_hash(),
123            "batch commit transaction sent to L1"
124        );
125
126        let receipt = pending_tx.get_receipt().await?;
127        if receipt.status() {
128            // We could also look at tx receipt's logs for a corresponding `BlockCommit` event but
129            // the existing logic is likely good enough for a test node.
130            tracing::info!(
131                batch = batch.header.number.0,
132                tx_hash = ?receipt.transaction_hash,
133                block_number = receipt.block_number.unwrap(),
134                "batch committed to L1",
135            );
136        } else {
137            tracing::error!(
138                batch = batch.header.number.0,
139                tx_hash = ?receipt.transaction_hash,
140                block_number = receipt.block_number.unwrap(),
141                "commit transaction failed"
142            );
143            anyhow::bail!(
144                "commit transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
145                receipt.transaction_hash
146            );
147        }
148
149        Ok(receipt.transaction_hash().0.into())
150    }
151
152    async fn prove(
153        &mut self,
154        batch: L1BatchWithMetadata,
155        reply: oneshot::Sender<anyhow::Result<H256>>,
156    ) {
157        let result = self.prove_fallible(&batch).await;
158        if result.is_ok() {
159            // Proving was successful, update last proved batch
160            self.last_proved_l1_batch = batch;
161        }
162
163        // Reply to sender if we can, otherwise hold result for further processing
164        let result = if let Err(result) = reply.send(result) {
165            tracing::info!("failed to reply as receiver has been dropped");
166            result
167        } else {
168            return;
169        };
170        // Not much we can do with an error at this level so we just print it
171        if let Err(err) = result {
172            tracing::error!("failed to prove batch: {:#?}", err);
173        }
174    }
175
176    async fn prove_fallible(&self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
177        // Create a blob sidecar with empty data
178        let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
179
180        let call = contracts::prove_batches_shared_bridge_call(
181            self.l2_chain_id,
182            &self.last_proved_l1_batch,
183            batch,
184        );
185
186        let gas_price = self.provider.get_gas_price().await?;
187        let eip1559_est = self.provider.estimate_eip1559_fees().await?;
188        let tx = TransactionRequest::default()
189            .with_to(self.validator_timelock_addr.0.into())
190            .with_max_fee_per_blob_gas(gas_price)
191            .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
192            .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
193            // Default value for `max_aggregated_tx_gas` from zksync-era, should always be enough
194            .with_gas_limit(15000000)
195            .with_call(&call)
196            .with_blob_sidecar(sidecar);
197
198        let pending_tx = self.provider.send_transaction(tx).await?;
199        tracing::debug!(
200            batch = batch.header.number.0,
201            pending_tx_hash = ?pending_tx.tx_hash(),
202            "batch prove transaction sent to L1"
203        );
204
205        let receipt = pending_tx.get_receipt().await?;
206        if receipt.status() {
207            // We could also look at tx receipt's logs for a corresponding `BlocksVerification` event but
208            // the existing logic is likely good enough for a test node.
209            tracing::info!(
210                batch = batch.header.number.0,
211                tx_hash = ?receipt.transaction_hash,
212                block_number = receipt.block_number.unwrap(),
213                "batch proved on L1",
214            );
215        } else {
216            tracing::error!(
217                batch = batch.header.number.0,
218                tx_hash = ?receipt.transaction_hash,
219                block_number = receipt.block_number.unwrap(),
220                "prove transaction failed"
221            );
222            anyhow::bail!(
223                "prove transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
224                receipt.transaction_hash
225            );
226        }
227
228        Ok(receipt.transaction_hash().0.into())
229    }
230
231    async fn execute(
232        &mut self,
233        batch: L1BatchWithMetadata,
234        reply: oneshot::Sender<anyhow::Result<H256>>,
235    ) {
236        let result = self.execute_fallible(&batch).await;
237
238        // Reply to sender if we can, otherwise hold result for further processing
239        let result = if let Err(result) = reply.send(result) {
240            tracing::info!("failed to reply as receiver has been dropped");
241            result
242        } else {
243            return;
244        };
245        // Not much we can do with an error at this level so we just print it
246        if let Err(err) = result {
247            tracing::error!("failed to execute batch: {:#?}", err);
248        }
249    }
250
251    async fn execute_fallible(&mut self, batch: &L1BatchWithMetadata) -> anyhow::Result<H256> {
252        // Create a blob sidecar with empty data
253        let sidecar = SidecarBuilder::<SimpleCoder>::from_slice(&[]).build()?;
254
255        // Push new priority transactions into the Merkle tree
256        for priority_op in &batch.header.priority_ops_onchain_data {
257            self.l1_tx_merkle_tree
258                .push_hash(priority_op.onchain_data_hash);
259        }
260        // Generate execution call based on the batch and the new priority transaction Merkle tree
261        let call = contracts::execute_batches_shared_bridge_call(
262            self.l2_chain_id,
263            batch,
264            &self.l1_tx_merkle_tree,
265        );
266
267        let gas_price = self.provider.get_gas_price().await?;
268        let eip1559_est = self.provider.estimate_eip1559_fees().await?;
269        let tx = TransactionRequest::default()
270            .with_to(self.validator_timelock_addr.0.into())
271            .with_max_fee_per_blob_gas(gas_price)
272            .with_max_fee_per_gas(eip1559_est.max_fee_per_gas)
273            .with_max_priority_fee_per_gas(eip1559_est.max_priority_fee_per_gas)
274            // Default value for `max_aggregated_tx_gas` from zksync-era, should always be enough
275            .with_gas_limit(15000000)
276            .with_call(&call)
277            .with_blob_sidecar(sidecar);
278
279        let pending_tx = self.provider.send_transaction(tx).await?;
280        tracing::debug!(
281            batch = batch.header.number.0,
282            pending_tx_hash = ?pending_tx.tx_hash(),
283            "batch execute transaction sent to L1"
284        );
285
286        let receipt = pending_tx.get_receipt().await?;
287        if receipt.status() {
288            // We could also look at tx receipt's logs for a corresponding `BlocksVerification` event but
289            // the existing logic is likely good enough for a test node.
290            tracing::info!(
291                batch = batch.header.number.0,
292                tx_hash = ?receipt.transaction_hash,
293                block_number = receipt.block_number.unwrap(),
294                "batch executed on L1",
295            );
296        } else {
297            tracing::error!(
298                batch = batch.header.number.0,
299                tx_hash = ?receipt.transaction_hash,
300                block_number = receipt.block_number.unwrap(),
301                "execute transaction failed"
302            );
303            anyhow::bail!(
304                "execute transaction failed, see L1 transaction's trace for more details (tx_hash='{:?}')",
305                receipt.transaction_hash
306            );
307        }
308
309        Ok(receipt.transaction_hash().0.into())
310    }
311}
312
313/// A cheap cloneable handle to a [`L1Sender`] instance that can send requests and await for them to
314/// be processed.
315#[derive(Clone, Debug)]
316pub struct L1SenderHandle {
317    command_sender: mpsc::Sender<Command>,
318}
319
320impl L1SenderHandle {
321    /// Request [`L1Sender`] to commit provided batch. Waits until an L1 transaction commiting the
322    /// batch is submitted to L1 and returns its hash.
323    pub async fn commit_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
324        let (response_sender, response_receiver) = oneshot::channel();
325        self.command_sender
326            .send(Command::Commit(batch, response_sender))
327            .await
328            .map_err(|_| anyhow::anyhow!("failed to commit a batch as L1 sender is dropped"))?;
329
330        match response_receiver.await {
331            Ok(result) => result,
332            Err(_) => anyhow::bail!("failed to commit a batch as L1 sender is dropped"),
333        }
334    }
335
336    /// Request [`L1Sender`] to prove provided batch. Waits until an L1 transaction proving the
337    /// batch is submitted to L1 and returns its hash.
338    pub async fn prove_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
339        let (response_sender, response_receiver) = oneshot::channel();
340        self.command_sender
341            .send(Command::Prove(batch, response_sender))
342            .await
343            .map_err(|_| anyhow::anyhow!("failed to prove a batch as L1 sender is dropped"))?;
344
345        match response_receiver.await {
346            Ok(result) => result,
347            Err(_) => anyhow::bail!("failed to prove a batch as L1 sender is dropped"),
348        }
349    }
350
351    /// Request [`L1Sender`] to execute provided batch. Waits until an L1 transaction executing the
352    /// batch is submitted to L1 and returns its hash.
353    pub async fn execute_sync(&self, batch: L1BatchWithMetadata) -> anyhow::Result<H256> {
354        let (response_sender, response_receiver) = oneshot::channel();
355        self.command_sender
356            .send(Command::Execute(batch, response_sender))
357            .await
358            .map_err(|_| anyhow::anyhow!("failed to execute a batch as L1 sender is dropped"))?;
359
360        match response_receiver.await {
361            Ok(result) => result,
362            Err(_) => anyhow::bail!("failed to execute a batch as L1 sender is dropped"),
363        }
364    }
365}
366
367#[derive(Debug)]
368enum Command {
369    Commit(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
370    Prove(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
371    Execute(L1BatchWithMetadata, oneshot::Sender<anyhow::Result<H256>>),
372}