anvil_zksync_l1_sidecar/
l1_watcher.rs

1use crate::contracts::NewPriorityRequest;
2use crate::zkstack_config::ZkstackConfig;
3use alloy::eips::BlockId;
4use alloy::providers::Provider;
5use alloy::rpc::types::Filter;
6use alloy::sol_types::SolEvent;
7use anvil_zksync_core::node::TxPool;
8use anyhow::Context;
9use std::sync::Arc;
10use std::time::Duration;
11use zksync_types::l1::L1Tx;
12use zksync_types::{PriorityOpId, L2_MESSAGE_ROOT_ADDRESS};
13
14/// Node component responsible for saving new priority L1 transactions to transaction pool.
15pub struct L1Watcher {
16    provider: Arc<dyn Provider + 'static>,
17    pool: TxPool,
18    addresses: Vec<alloy::primitives::Address>,
19
20    next_expected_priority_id: PriorityOpId,
21    from_block: u64,
22}
23
24impl L1Watcher {
25    pub fn new(
26        zkstack_config: &ZkstackConfig,
27        provider: Arc<dyn Provider + 'static>,
28        pool: TxPool,
29    ) -> Self {
30        let addresses = vec![
31            alloy::primitives::Address::from(zkstack_config.contracts.l1.diamond_proxy_addr.0),
32            alloy::primitives::Address::from(zkstack_config.contracts.l1.governance_addr.0),
33            alloy::primitives::Address::from(
34                zkstack_config
35                    .contracts
36                    .ecosystem_contracts
37                    .state_transition_proxy_addr
38                    .0,
39            ),
40            alloy::primitives::Address::from(zkstack_config.contracts.l1.chain_admin_addr.0),
41            alloy::primitives::Address::from(L2_MESSAGE_ROOT_ADDRESS.0),
42        ];
43        Self {
44            provider,
45            pool,
46            addresses,
47            next_expected_priority_id: PriorityOpId(0),
48            from_block: 0,
49        }
50    }
51
52    /// Runs L1 watcher indefinitely thus saving all incoming L1 transaction to the pool.
53    pub async fn run(mut self) -> anyhow::Result<()> {
54        let mut timer = tokio::time::interval(Duration::from_millis(100));
55        loop {
56            timer.tick().await;
57            self.poll().await?;
58        }
59    }
60}
61
62impl L1Watcher {
63    async fn poll(&mut self) -> anyhow::Result<()> {
64        let latest_block = self
65            .provider
66            .get_block(BlockId::latest())
67            .await?
68            .context("L1 does not have any block")?;
69        let to_block = latest_block.header.number;
70        if self.from_block > to_block {
71            return Ok(());
72        }
73        let filter = Filter::new()
74            .from_block(self.from_block)
75            .to_block(to_block)
76            .event_signature(NewPriorityRequest::SIGNATURE_HASH)
77            .address(self.addresses.clone());
78        let events = self.provider.get_logs(&filter).await?;
79        let mut priority_txs = Vec::new();
80        for event in events {
81            let zksync_log: zksync_types::web3::Log =
82                serde_json::from_value(serde_json::to_value(event)?)?;
83            let tx = L1Tx::try_from(zksync_log)?;
84            priority_txs.push(tx);
85        }
86
87        if priority_txs.is_empty() {
88            return Ok(());
89        }
90        // unwraps are safe because the vec is not empty
91        let first = priority_txs.first().unwrap();
92        let last = priority_txs.last().unwrap();
93        tracing::info!(
94            first_serial_id = %first.serial_id(),
95            last_serial_id = %last.serial_id(),
96            first_block = %first.eth_block(),
97            last_block = %last.eth_block(),
98            "received priority requests",
99        );
100        anyhow::ensure!(
101            last.serial_id().0 - first.serial_id().0 + 1 == priority_txs.len() as u64,
102            "there is a gap in priority transactions received"
103        );
104        let new_txs: Vec<_> = priority_txs
105            .into_iter()
106            .skip_while(|tx| tx.serial_id() < self.next_expected_priority_id)
107            .collect();
108
109        if new_txs.is_empty() {
110            return Ok(());
111        }
112        let first = new_txs.first().unwrap();
113        let last = new_txs.last().unwrap();
114        anyhow::ensure!(
115            first.serial_id() == self.next_expected_priority_id,
116            "priority transaction serial id mismatch"
117        );
118
119        let next_expected_priority_id = last.serial_id().next();
120        for tx in new_txs {
121            tracing::debug!(
122                hash = ?tx.hash(),
123                "adding new priority transaction to mempool",
124            );
125            self.pool.add_tx(tx.into());
126        }
127        self.next_expected_priority_id = next_expected_priority_id;
128        self.from_block = to_block + 1;
129
130        Ok(())
131    }
132}