anvil_zksync_api_server/
server.rs1use crate::{
2 AnvilNamespace, AnvilZksNamespace, ConfigNamespace, DebugNamespace, EthNamespace,
3 EthTestNamespace, EvmNamespace, NetNamespace, Web3Namespace, ZksNamespace,
4};
5use anvil_zksync_api_decl::{
6 AnvilNamespaceServer, AnvilZksNamespaceServer, ConfigNamespaceServer, DebugNamespaceServer,
7 EthNamespaceServer, EthTestNamespaceServer, EvmNamespaceServer, NetNamespaceServer,
8 Web3NamespaceServer, ZksNamespaceServer,
9};
10use anvil_zksync_core::node::InMemoryNode;
11use anvil_zksync_l1_sidecar::L1Sidecar;
12use futures::future::BoxFuture;
13use futures::FutureExt;
14use http::Method;
15use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
16use jsonrpsee::server::middleware::rpc::RpcServiceT;
17use jsonrpsee::server::{MethodResponse, RpcServiceBuilder, ServerBuilder, ServerHandle};
18use jsonrpsee::types::Request;
19use jsonrpsee::RpcModule;
20use std::net::SocketAddr;
21use tower_http::cors::{AllowOrigin, CorsLayer};
22use zksync_telemetry::{get_telemetry, TelemetryProps};
23
24#[derive(Clone)]
25pub struct NodeServerBuilder {
26 node: InMemoryNode,
27 l1_sidecar: L1Sidecar,
28 health_api_enabled: bool,
29 cors_enabled: bool,
30 allow_origin: AllowOrigin,
31}
32
33impl NodeServerBuilder {
34 pub fn new(node: InMemoryNode, l1_sidecar: L1Sidecar, allow_origin: AllowOrigin) -> Self {
35 Self {
36 node,
37 l1_sidecar,
38 health_api_enabled: false,
39 cors_enabled: false,
40 allow_origin,
41 }
42 }
43
44 pub fn enable_health_api(&mut self) {
45 self.health_api_enabled = true;
46 }
47
48 pub fn enable_cors(&mut self) {
49 self.cors_enabled = true;
50 }
51
52 fn default_rpc(node: InMemoryNode, l1_sidecar: L1Sidecar) -> RpcModule<()> {
53 let mut rpc = RpcModule::new(());
54 rpc.merge(EthNamespace::new(node.clone()).into_rpc())
55 .unwrap();
56 rpc.merge(EthTestNamespace::new(node.clone()).into_rpc())
57 .unwrap();
58 rpc.merge(AnvilNamespace::new(node.clone()).into_rpc())
59 .unwrap();
60 rpc.merge(AnvilZksNamespace::new(l1_sidecar.clone()).into_rpc())
61 .unwrap();
62 rpc.merge(EvmNamespace::new(node.clone()).into_rpc())
63 .unwrap();
64 rpc.merge(DebugNamespace::new(node.clone()).into_rpc())
65 .unwrap();
66 rpc.merge(NetNamespace::new(node.clone()).into_rpc())
67 .unwrap();
68 rpc.merge(ConfigNamespace::new(node.clone()).into_rpc())
69 .unwrap();
70 rpc.merge(ZksNamespace::new(node, l1_sidecar).into_rpc())
71 .unwrap();
72 rpc.merge(Web3Namespace.into_rpc()).unwrap();
73 rpc
74 }
75
76 pub async fn build(self, addr: SocketAddr) -> Result<NodeServer, String> {
77 let cors_layers = tower::util::option_layer(self.cors_enabled.then(|| {
78 CorsLayer::new()
83 .allow_origin(self.allow_origin.clone())
84 .allow_headers([http::header::CONTENT_TYPE])
85 .allow_methods([Method::GET, Method::POST])
86 }));
87 let health_api_layer = tower::util::option_layer(
88 self.health_api_enabled
89 .then(|| ProxyGetRequestLayer::new("/health", "web3_clientVersion").unwrap()),
90 );
91 let server_builder = ServerBuilder::default()
92 .http_only()
93 .set_http_middleware(
94 tower::ServiceBuilder::new()
95 .layer(cors_layers)
96 .layer(health_api_layer),
97 )
98 .set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(100))
99 .set_rpc_middleware(
100 RpcServiceBuilder::new().layer_fn(|service| TelemetryReporter { service }),
101 );
102
103 match server_builder.build(addr).await {
104 Ok(server) => {
105 let local_addr = server.local_addr().unwrap();
106 let rpc = Self::default_rpc(self.node, self.l1_sidecar);
107 Ok(NodeServer {
110 local_addr,
111 run_fn: Box::new(move || server.start(rpc)),
112 })
113 }
114 Err(e) => Err(format!("Failed to bind to address {}: {}", addr, e)),
115 }
116 }
117}
118
119pub struct NodeServer {
120 local_addr: SocketAddr,
121 run_fn: Box<dyn FnOnce() -> ServerHandle>,
122}
123
124impl NodeServer {
125 pub fn local_addr(&self) -> SocketAddr {
127 self.local_addr
128 }
129
130 pub fn run(self) -> ServerHandle {
136 (self.run_fn)()
137 }
138}
139
140#[derive(Clone)]
141pub struct TelemetryReporter<S> {
142 service: S,
143}
144
145impl<'a, S> RpcServiceT<'a> for TelemetryReporter<S>
146where
147 S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
148{
149 type Future = BoxFuture<'a, MethodResponse>;
150
151 fn call(&self, req: Request<'a>) -> Self::Future {
152 let service = self.service.clone();
153 let telemetry_opt = get_telemetry();
154
155 async move {
156 if let Some(tel) = telemetry_opt {
157 let method = req.method_name();
158 if method.starts_with("anvil_") || method.starts_with("config_") {
160 let _ = tel
161 .track_event(
162 "rpc_call",
163 TelemetryProps::new().insert("method", Some(method)).take(),
164 )
165 .await;
166 }
167 }
168 service.call(req).await
169 }
170 .boxed()
171 }
172}