moonbeam_service/lazy_loading/
rpc_client.rs

1// Copyright 2025 Moonbeam foundation
2// This file is part of Moonbeam.
3
4// Moonbeam is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Moonbeam is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16
17use cumulus_primitives_core::BlockT;
18use fc_rpc_v2_api::types::H256;
19use jsonrpsee::http_client::HttpClient;
20use moonbeam_core_primitives::BlockNumber;
21use serde::de::DeserializeOwned;
22use sp_api::__private::HeaderT;
23use sp_rpc::list::ListOrValue;
24use sp_rpc::number::NumberOrHex;
25use sp_runtime::generic::SignedBlock;
26use sp_storage::{StorageData, StorageKey};
27use std::future::Future;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio_retry::strategy::FixedInterval;
32use tokio_retry::Retry;
33
34#[derive(Debug, Clone)]
35pub struct RPC {
36	http_client: HttpClient,
37	delay_between_requests_ms: u32,
38	max_retries_per_request: u32,
39	counter: Arc<AtomicU64>,
40}
41
42impl RPC {
43	pub fn new(
44		http_client: HttpClient,
45		delay_between_requests_ms: u32,
46		max_retries_per_request: u32,
47	) -> Self {
48		Self {
49			http_client,
50			delay_between_requests_ms,
51			max_retries_per_request,
52			counter: Default::default(),
53		}
54	}
55	pub fn system_chain(&self) -> Result<String, jsonrpsee::core::ClientError> {
56		let request = &|| {
57			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_chain(&self.http_client)
58		};
59
60		self.block_on(request)
61	}
62
63	pub fn system_properties(
64		&self,
65	) -> Result<sc_chain_spec::Properties, jsonrpsee::core::ClientError> {
66		let request = &|| {
67			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_properties(
68				&self.http_client,
69			)
70		};
71
72		self.block_on(request)
73	}
74
75	pub fn block<Block, Hash: Clone>(
76		&self,
77		hash: Option<Hash>,
78	) -> Result<Option<SignedBlock<Block>>, jsonrpsee::core::ClientError>
79	where
80		Block: BlockT + DeserializeOwned,
81		Hash: 'static + Send + Sync + sp_runtime::Serialize + DeserializeOwned,
82	{
83		let request = &|| {
84			substrate_rpc_client::ChainApi::<
85				BlockNumber,
86				Hash,
87				Block::Header,
88				SignedBlock<Block>,
89			>::block(&self.http_client, hash.clone())
90		};
91
92		self.block_on(request)
93	}
94
95	pub fn block_hash<Block: BlockT + DeserializeOwned>(
96		&self,
97		block_number: Option<<Block::Header as HeaderT>::Number>,
98	) -> Result<Option<Block::Hash>, jsonrpsee::core::ClientError> {
99		let request = &|| {
100			substrate_rpc_client::ChainApi::<
101				<Block::Header as HeaderT>::Number,
102				Block::Hash,
103				Block::Header,
104				SignedBlock<Block>,
105			>::block_hash(
106				&self.http_client,
107				block_number.map(|n| ListOrValue::Value(NumberOrHex::Hex(n.into()))),
108			)
109		};
110
111		self.block_on(request).map(|ok| match ok {
112			ListOrValue::List(v) => v.get(0).map_or(None, |some| *some),
113			ListOrValue::Value(v) => v,
114		})
115	}
116
117	pub fn header<Block: BlockT + DeserializeOwned>(
118		&self,
119		hash: Option<Block::Hash>,
120	) -> Result<Option<Block::Header>, jsonrpsee::core::ClientError> {
121		let request = &|| {
122			substrate_rpc_client::ChainApi::<
123				BlockNumber,
124				Block::Hash,
125				Block::Header,
126				SignedBlock<Block>,
127			>::header(&self.http_client, hash)
128		};
129
130		self.block_on(request)
131	}
132
133	pub fn storage_hash<
134		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
135	>(
136		&self,
137		key: StorageKey,
138		at: Option<Hash>,
139	) -> Result<Option<Hash>, jsonrpsee::core::ClientError> {
140		let request = &|| {
141			substrate_rpc_client::StateApi::<Hash>::storage_hash(
142				&self.http_client,
143				key.clone(),
144				at.clone(),
145			)
146		};
147
148		self.block_on(request)
149	}
150
151	pub fn storage<
152		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize + core::fmt::Debug,
153	>(
154		&self,
155		key: StorageKey,
156		at: Option<Hash>,
157	) -> Result<Option<StorageData>, jsonrpsee::core::ClientError> {
158		let request = &|| {
159			substrate_rpc_client::StateApi::<Hash>::storage(
160				&self.http_client,
161				key.clone(),
162				at.clone(),
163			)
164		};
165
166		self.block_on(request)
167	}
168
169	pub fn storage_keys_paged<
170		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
171	>(
172		&self,
173		key: Option<StorageKey>,
174		count: u32,
175		start_key: Option<StorageKey>,
176		at: Option<Hash>,
177	) -> Result<Vec<sp_state_machine::StorageKey>, jsonrpsee::core::ClientError> {
178		let request = &|| {
179			substrate_rpc_client::StateApi::<Hash>::storage_keys_paged(
180				&self.http_client,
181				key.clone(),
182				count.clone(),
183				start_key.clone(),
184				at.clone(),
185			)
186		};
187		let result = self.block_on(request);
188
189		match result {
190			Ok(result) => Ok(result.iter().map(|item| item.0.clone()).collect()),
191			Err(err) => Err(err),
192		}
193	}
194
195	pub fn transaction_by_hash(
196		&self,
197		eth_transaction_hash: &H256,
198	) -> Result<Option<fc_rpc_v2_api::types::Transaction>, jsonrpsee::core::ClientError> {
199		let request = &|| {
200			fc_rpc_v2_api::eth::EthTransactionApiClient::transaction_by_hash(
201				&self.http_client,
202				eth_transaction_hash.clone(),
203			)
204		};
205
206		self.block_on(request)
207	}
208
209	pub fn block_by_hash(
210		&self,
211		eth_block_hash: &H256,
212		full: bool,
213	) -> Result<Option<fc_rpc_v2_api::types::Block>, jsonrpsee::core::ClientError> {
214		let request = &|| {
215			fc_rpc_v2_api::eth::EthBlockApiClient::block_by_hash(
216				&self.http_client,
217				eth_block_hash.clone(),
218				full,
219			)
220		};
221
222		self.block_on(request)
223	}
224
225	fn block_on<F, T, E>(&self, f: &dyn Fn() -> F) -> Result<T, E>
226	where
227		F: Future<Output = Result<T, E>>,
228	{
229		use tokio::runtime::Handle;
230
231		let id = self.counter.fetch_add(1, Ordering::SeqCst);
232		let start = std::time::Instant::now();
233
234		tokio::task::block_in_place(move || {
235			Handle::current().block_on(async move {
236				let delay_between_requests =
237					Duration::from_millis(self.delay_between_requests_ms.into());
238
239				let start_req = std::time::Instant::now();
240				log::debug!(
241					target: super::LAZY_LOADING_LOG_TARGET,
242					"Sending request: {}",
243					id
244				);
245
246				// Explicit request delay, to avoid getting 429 errors
247				let _ = tokio::time::sleep(delay_between_requests).await;
248
249				// Retry request in case of failure
250				// The maximum number of retries is specified by `self.max_retries_per_request`
251				let retry_strategy = FixedInterval::new(delay_between_requests)
252					.take(self.max_retries_per_request as usize);
253				let result = Retry::spawn(retry_strategy, f).await;
254
255				log::debug!(
256					target: super::LAZY_LOADING_LOG_TARGET,
257					"Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})",
258					id,
259					result.is_ok(),
260					start.elapsed(),
261					start_req.elapsed()
262				);
263
264				result
265			})
266		})
267	}
268}