moonbeam_service/lazy_loading/
rpc_client.rs1use cumulus_primitives_core::BlockT;
18use fc_rpc_v2_api::types::H256;
19use jsonrpsee::http_client::HttpClient;
20use moonbeam_core_primitives::BlockNumber;
21use serde::de::DeserializeOwned;
22use sp_api::__private::HeaderT;
23use sp_rpc::list::ListOrValue;
24use sp_rpc::number::NumberOrHex;
25use sp_runtime::generic::SignedBlock;
26use sp_storage::{StorageData, StorageKey};
27use std::future::Future;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio_retry::strategy::FixedInterval;
32use tokio_retry::Retry;
33
34#[derive(Debug, Clone)]
35pub struct RPC {
36 http_client: HttpClient,
37 delay_between_requests_ms: u32,
38 max_retries_per_request: u32,
39 counter: Arc<AtomicU64>,
40}
41
42impl RPC {
43 pub fn new(
44 http_client: HttpClient,
45 delay_between_requests_ms: u32,
46 max_retries_per_request: u32,
47 ) -> Self {
48 Self {
49 http_client,
50 delay_between_requests_ms,
51 max_retries_per_request,
52 counter: Default::default(),
53 }
54 }
55 pub fn system_chain(&self) -> Result<String, jsonrpsee::core::ClientError> {
56 let request = &|| {
57 substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_chain(&self.http_client)
58 };
59
60 self.block_on(request)
61 }
62
63 pub fn system_properties(
64 &self,
65 ) -> Result<sc_chain_spec::Properties, jsonrpsee::core::ClientError> {
66 let request = &|| {
67 substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_properties(
68 &self.http_client,
69 )
70 };
71
72 self.block_on(request)
73 }
74
75 pub fn block<Block, Hash: Clone>(
76 &self,
77 hash: Option<Hash>,
78 ) -> Result<Option<SignedBlock<Block>>, jsonrpsee::core::ClientError>
79 where
80 Block: BlockT + DeserializeOwned,
81 Hash: 'static + Send + Sync + sp_runtime::Serialize + DeserializeOwned,
82 {
83 let request = &|| {
84 substrate_rpc_client::ChainApi::<
85 BlockNumber,
86 Hash,
87 Block::Header,
88 SignedBlock<Block>,
89 >::block(&self.http_client, hash.clone())
90 };
91
92 self.block_on(request)
93 }
94
95 pub fn block_hash<Block: BlockT + DeserializeOwned>(
96 &self,
97 block_number: Option<<Block::Header as HeaderT>::Number>,
98 ) -> Result<Option<Block::Hash>, jsonrpsee::core::ClientError> {
99 let request = &|| {
100 substrate_rpc_client::ChainApi::<
101 <Block::Header as HeaderT>::Number,
102 Block::Hash,
103 Block::Header,
104 SignedBlock<Block>,
105 >::block_hash(
106 &self.http_client,
107 block_number.map(|n| ListOrValue::Value(NumberOrHex::Hex(n.into()))),
108 )
109 };
110
111 self.block_on(request).map(|ok| match ok {
112 ListOrValue::List(v) => v.get(0).map_or(None, |some| *some),
113 ListOrValue::Value(v) => v,
114 })
115 }
116
117 pub fn header<Block: BlockT + DeserializeOwned>(
118 &self,
119 hash: Option<Block::Hash>,
120 ) -> Result<Option<Block::Header>, jsonrpsee::core::ClientError> {
121 let request = &|| {
122 substrate_rpc_client::ChainApi::<
123 BlockNumber,
124 Block::Hash,
125 Block::Header,
126 SignedBlock<Block>,
127 >::header(&self.http_client, hash)
128 };
129
130 self.block_on(request)
131 }
132
133 pub fn storage_hash<
134 Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
135 >(
136 &self,
137 key: StorageKey,
138 at: Option<Hash>,
139 ) -> Result<Option<Hash>, jsonrpsee::core::ClientError> {
140 let request = &|| {
141 substrate_rpc_client::StateApi::<Hash>::storage_hash(
142 &self.http_client,
143 key.clone(),
144 at.clone(),
145 )
146 };
147
148 self.block_on(request)
149 }
150
151 pub fn storage<
152 Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize + core::fmt::Debug,
153 >(
154 &self,
155 key: StorageKey,
156 at: Option<Hash>,
157 ) -> Result<Option<StorageData>, jsonrpsee::core::ClientError> {
158 let request = &|| {
159 substrate_rpc_client::StateApi::<Hash>::storage(
160 &self.http_client,
161 key.clone(),
162 at.clone(),
163 )
164 };
165
166 self.block_on(request)
167 }
168
169 pub fn storage_keys_paged<
170 Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
171 >(
172 &self,
173 key: Option<StorageKey>,
174 count: u32,
175 start_key: Option<StorageKey>,
176 at: Option<Hash>,
177 ) -> Result<Vec<sp_state_machine::StorageKey>, jsonrpsee::core::ClientError> {
178 let request = &|| {
179 substrate_rpc_client::StateApi::<Hash>::storage_keys_paged(
180 &self.http_client,
181 key.clone(),
182 count.clone(),
183 start_key.clone(),
184 at.clone(),
185 )
186 };
187 let result = self.block_on(request);
188
189 match result {
190 Ok(result) => Ok(result.iter().map(|item| item.0.clone()).collect()),
191 Err(err) => Err(err),
192 }
193 }
194
195 pub fn transaction_by_hash(
196 &self,
197 eth_transaction_hash: &H256,
198 ) -> Result<Option<fc_rpc_v2_api::types::Transaction>, jsonrpsee::core::ClientError> {
199 let request = &|| {
200 fc_rpc_v2_api::eth::EthTransactionApiClient::transaction_by_hash(
201 &self.http_client,
202 eth_transaction_hash.clone(),
203 )
204 };
205
206 self.block_on(request)
207 }
208
209 pub fn block_by_hash(
210 &self,
211 eth_block_hash: &H256,
212 full: bool,
213 ) -> Result<Option<fc_rpc_v2_api::types::Block>, jsonrpsee::core::ClientError> {
214 let request = &|| {
215 fc_rpc_v2_api::eth::EthBlockApiClient::block_by_hash(
216 &self.http_client,
217 eth_block_hash.clone(),
218 full,
219 )
220 };
221
222 self.block_on(request)
223 }
224
225 fn block_on<F, T, E>(&self, f: &dyn Fn() -> F) -> Result<T, E>
226 where
227 F: Future<Output = Result<T, E>>,
228 {
229 use tokio::runtime::Handle;
230
231 let id = self.counter.fetch_add(1, Ordering::SeqCst);
232 let start = std::time::Instant::now();
233
234 tokio::task::block_in_place(move || {
235 Handle::current().block_on(async move {
236 let delay_between_requests =
237 Duration::from_millis(self.delay_between_requests_ms.into());
238
239 let start_req = std::time::Instant::now();
240 log::debug!(
241 target: super::LAZY_LOADING_LOG_TARGET,
242 "Sending request: {}",
243 id
244 );
245
246 let _ = tokio::time::sleep(delay_between_requests).await;
248
249 let retry_strategy = FixedInterval::new(delay_between_requests)
252 .take(self.max_retries_per_request as usize);
253 let result = Retry::spawn(retry_strategy, f).await;
254
255 log::debug!(
256 target: super::LAZY_LOADING_LOG_TARGET,
257 "Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})",
258 id,
259 result.is_ok(),
260 start.elapsed(),
261 start_req.elapsed()
262 );
263
264 result
265 })
266 })
267 }
268}