anvil_zksync_l1_sidecar/
l1_watcher.rs1use crate::contracts::NewPriorityRequest;
2use crate::zkstack_config::ZkstackConfig;
3use alloy::eips::BlockId;
4use alloy::providers::Provider;
5use alloy::rpc::types::Filter;
6use alloy::sol_types::SolEvent;
7use anvil_zksync_core::node::TxPool;
8use anyhow::Context;
9use std::sync::Arc;
10use std::time::Duration;
11use zksync_types::l1::L1Tx;
12use zksync_types::{PriorityOpId, L2_MESSAGE_ROOT_ADDRESS};
13
14pub struct L1Watcher {
16 provider: Arc<dyn Provider + 'static>,
17 pool: TxPool,
18 addresses: Vec<alloy::primitives::Address>,
19
20 next_expected_priority_id: PriorityOpId,
21 from_block: u64,
22}
23
24impl L1Watcher {
25 pub fn new(
26 zkstack_config: &ZkstackConfig,
27 provider: Arc<dyn Provider + 'static>,
28 pool: TxPool,
29 ) -> Self {
30 let addresses = vec![
31 alloy::primitives::Address::from(zkstack_config.contracts.l1.diamond_proxy_addr.0),
32 alloy::primitives::Address::from(zkstack_config.contracts.l1.governance_addr.0),
33 alloy::primitives::Address::from(
34 zkstack_config
35 .contracts
36 .ecosystem_contracts
37 .state_transition_proxy_addr
38 .0,
39 ),
40 alloy::primitives::Address::from(zkstack_config.contracts.l1.chain_admin_addr.0),
41 alloy::primitives::Address::from(L2_MESSAGE_ROOT_ADDRESS.0),
42 ];
43 Self {
44 provider,
45 pool,
46 addresses,
47 next_expected_priority_id: PriorityOpId(0),
48 from_block: 0,
49 }
50 }
51
52 pub async fn run(mut self) -> anyhow::Result<()> {
54 let mut timer = tokio::time::interval(Duration::from_millis(100));
55 loop {
56 timer.tick().await;
57 self.poll().await?;
58 }
59 }
60}
61
62impl L1Watcher {
63 async fn poll(&mut self) -> anyhow::Result<()> {
64 let latest_block = self
65 .provider
66 .get_block(BlockId::latest())
67 .await?
68 .context("L1 does not have any block")?;
69 let to_block = latest_block.header.number;
70 if self.from_block > to_block {
71 return Ok(());
72 }
73 let filter = Filter::new()
74 .from_block(self.from_block)
75 .to_block(to_block)
76 .event_signature(NewPriorityRequest::SIGNATURE_HASH)
77 .address(self.addresses.clone());
78 let events = self.provider.get_logs(&filter).await?;
79 let mut priority_txs = Vec::new();
80 for event in events {
81 let zksync_log: zksync_types::web3::Log =
82 serde_json::from_value(serde_json::to_value(event)?)?;
83 let tx = L1Tx::try_from(zksync_log)?;
84 priority_txs.push(tx);
85 }
86
87 if priority_txs.is_empty() {
88 return Ok(());
89 }
90 let first = priority_txs.first().unwrap();
92 let last = priority_txs.last().unwrap();
93 tracing::info!(
94 first_serial_id = %first.serial_id(),
95 last_serial_id = %last.serial_id(),
96 first_block = %first.eth_block(),
97 last_block = %last.eth_block(),
98 "received priority requests",
99 );
100 anyhow::ensure!(
101 last.serial_id().0 - first.serial_id().0 + 1 == priority_txs.len() as u64,
102 "there is a gap in priority transactions received"
103 );
104 let new_txs: Vec<_> = priority_txs
105 .into_iter()
106 .skip_while(|tx| tx.serial_id() < self.next_expected_priority_id)
107 .collect();
108
109 if new_txs.is_empty() {
110 return Ok(());
111 }
112 let first = new_txs.first().unwrap();
113 let last = new_txs.last().unwrap();
114 anyhow::ensure!(
115 first.serial_id() == self.next_expected_priority_id,
116 "priority transaction serial id mismatch"
117 );
118
119 let next_expected_priority_id = last.serial_id().next();
120 for tx in new_txs {
121 tracing::debug!(
122 hash = ?tx.hash(),
123 "adding new priority transaction to mempool",
124 );
125 self.pool.add_tx(tx.into());
126 }
127 self.next_expected_priority_id = next_expected_priority_id;
128 self.from_block = to_block + 1;
129
130 Ok(())
131 }
132}