moonbeam_rpc_trace/
lib.rs

1// Copyright 2019-2025 PureStake Inc.
2// This file is part of Moonbeam.
3
4// Moonbeam is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Moonbeam is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16
17//! `trace_filter` RPC handler and its associated service task.
18//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19//! executor.
20//!
21//! The implementation is composed of multiple tasks :
22//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23//! - A main `CacheTask` managing the cache and the communication between tasks.
24//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25//!   task and waiting for the result, then send it to the main `CacheTask`.
26
27use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
28use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
29use tokio::{
30	sync::{mpsc, oneshot, Semaphore},
31	time::sleep,
32};
33use tracing::{instrument, Instrument};
34
35use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
36use sc_utils::mpsc::TracingUnboundedSender;
37use sp_api::{ApiExt, Core, ProvideRuntimeApi};
38use sp_block_builder::BlockBuilder;
39use sp_blockchain::{
40	Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
41};
42use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
43use substrate_prometheus_endpoint::{
44	register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
45};
46
47use ethereum_types::H256;
48use fc_storage::StorageOverride;
49use fp_rpc::EthereumRuntimeRPCApi;
50
51use moonbeam_client_evm_tracing::{
52	formatters::ResponseFormatter,
53	types::block::{self, TransactionTrace},
54};
55pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
56use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
57use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
58
59type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
60
61/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
62pub struct Trace<B, C> {
63	_phantom: PhantomData<B>,
64	client: Arc<C>,
65	requester: CacheRequester,
66	max_count: u32,
67}
68
69impl<B, C> Clone for Trace<B, C> {
70	fn clone(&self) -> Self {
71		Self {
72			_phantom: PhantomData,
73			client: Arc::clone(&self.client),
74			requester: self.requester.clone(),
75			max_count: self.max_count,
76		}
77	}
78}
79
80impl<B, C> Trace<B, C>
81where
82	B: BlockT<Hash = H256> + Send + Sync + 'static,
83	B::Header: HeaderT<Number = u32>,
84	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
85	C: Send + Sync + 'static,
86{
87	/// Create a new RPC handler.
88	pub fn new(client: Arc<C>, requester: CacheRequester, max_count: u32) -> Self {
89		Self {
90			client,
91			requester,
92			max_count,
93			_phantom: PhantomData,
94		}
95	}
96
97	/// Convert an optional block ID (number or tag) to a block height.
98	fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
99		match id {
100			Some(RequestBlockId::Number(n)) => Ok(n),
101			None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
102				Ok(self.client.info().best_number)
103			}
104			Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
105			Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
106				Ok(self.client.info().finalized_number)
107			}
108			Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
109				Err("'pending' is not supported")
110			}
111			Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
112		}
113	}
114
115	/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
116	async fn filter(self, req: FilterRequest) -> TxsTraceRes {
117		let from_block = self.block_id(req.from_block)?;
118		let to_block = self.block_id(req.to_block)?;
119		let block_heights = from_block..=to_block;
120
121		let count = req.count.unwrap_or(self.max_count);
122		if count > self.max_count {
123			return Err(format!(
124				"count ({}) can't be greater than maximum ({})",
125				count, self.max_count
126			));
127		}
128
129		// Build a list of all the Substrate block hashes that need to be traced.
130		let mut block_hashes = vec![];
131		for block_height in block_heights {
132			if block_height == 0 {
133				continue; // no traces for genesis block.
134			}
135
136			let block_hash = self
137				.client
138				.hash(block_height)
139				.map_err(|e| {
140					format!(
141						"Error when fetching block {} header : {:?}",
142						block_height, e
143					)
144				})?
145				.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
146
147			block_hashes.push(block_hash);
148		}
149
150		// Start a batch with these blocks.
151		let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
152		// Fetch all the traces. It is done in another function to simplify error handling and allow
153		// to call the following `stop_batch` regardless of the result. This is important for the
154		// cache cleanup to work properly.
155		let res = self.fetch_traces(req, &block_hashes, count as usize).await;
156		// Stop the batch, allowing the cache task to remove useless non-started block traces and
157		// start the expiration delay.
158		self.requester.stop_batch(batch_id).await;
159
160		res
161	}
162
163	async fn fetch_traces(
164		&self,
165		req: FilterRequest,
166		block_hashes: &[H256],
167		count: usize,
168	) -> TxsTraceRes {
169		let from_address = req.from_address.unwrap_or_default();
170		let to_address = req.to_address.unwrap_or_default();
171
172		let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
173		let mut traces = vec![];
174
175		for &block_hash in block_hashes {
176			// Request the traces of this block to the cache service.
177			// This will resolve quickly if the block is already cached, or wait until the block
178			// has finished tracing.
179			let block_traces = self.requester.get_traces(block_hash).await?;
180
181			// Filter addresses.
182			let mut block_traces: Vec<_> = block_traces
183				.iter()
184				.filter(|trace| match trace.action {
185					block::TransactionTraceAction::Call { from, to, .. } => {
186						(from_address.is_empty() || from_address.contains(&from))
187							&& (to_address.is_empty() || to_address.contains(&to))
188					}
189					block::TransactionTraceAction::Create { from, .. } => {
190						(from_address.is_empty() || from_address.contains(&from))
191							&& to_address.is_empty()
192					}
193					block::TransactionTraceAction::Suicide { address, .. } => {
194						(from_address.is_empty() || from_address.contains(&address))
195							&& to_address.is_empty()
196					}
197				})
198				.cloned()
199				.collect();
200
201			// Don't insert anything if we're still before "after"
202			traces_amount += block_traces.len() as i64;
203			if traces_amount > 0 {
204				let traces_amount = traces_amount as usize;
205				// If the current Vec of traces is across the "after" marker,
206				// we skip some elements of it.
207				if traces_amount < block_traces.len() {
208					let skip = block_traces.len() - traces_amount;
209					block_traces = block_traces.into_iter().skip(skip).collect();
210				}
211
212				traces.append(&mut block_traces);
213
214				// If we go over "count" (the limit), we trim and exit the loop,
215				// unless we used the default maximum, in which case we return an error.
216				if traces_amount >= count {
217					if req.count.is_none() {
218						return Err(format!(
219							"the amount of traces goes over the maximum ({}), please use 'after' \
220							and 'count' in your request",
221							self.max_count
222						));
223					}
224
225					traces = traces.into_iter().take(count).collect();
226					break;
227				}
228			}
229		}
230
231		Ok(traces)
232	}
233}
234
235#[jsonrpsee::core::async_trait]
236impl<B, C> TraceServer for Trace<B, C>
237where
238	B: BlockT<Hash = H256> + Send + Sync + 'static,
239	B::Header: HeaderT<Number = u32>,
240	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
241	C: Send + Sync + 'static,
242{
243	async fn filter(
244		&self,
245		filter: FilterRequest,
246	) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
247		self.clone()
248			.filter(filter)
249			.await
250			.map_err(fc_rpc::internal_err)
251	}
252}
253
254/// An opaque batch ID.
255#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
256pub struct CacheBatchId(u64);
257
258/// Requests the cache task can accept.
259enum CacheRequest {
260	/// Request to start caching the provided range of blocks.
261	/// The task will add to blocks to its pool and immediately return a new batch ID.
262	StartBatch {
263		/// Returns the ID of the batch for cancellation.
264		sender: oneshot::Sender<CacheBatchId>,
265		/// List of block hash to trace.
266		blocks: Vec<H256>,
267	},
268	/// Fetch the traces for given block hash.
269	/// The task will answer only when it has processed this block.
270	GetTraces {
271		/// Returns the array of traces or an error.
272		sender: oneshot::Sender<TxsTraceRes>,
273		/// Hash of the block.
274		block: H256,
275	},
276	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
277	/// this batch and still not started will be discarded.
278	StopBatch { batch_id: CacheBatchId },
279}
280
281/// Allows to interact with the cache task.
282#[derive(Clone)]
283pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
284
285impl CacheRequester {
286	/// Request to start caching the provided range of blocks.
287	/// The task will add to blocks to its pool and immediately return the batch ID.
288	#[instrument(skip(self))]
289	pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
290		let (response_tx, response_rx) = oneshot::channel();
291		let sender = self.0.clone();
292
293		sender
294			.unbounded_send(CacheRequest::StartBatch {
295				sender: response_tx,
296				blocks,
297			})
298			.map_err(|e| {
299				format!(
300					"Failed to send request to the trace cache task. Error : {:?}",
301					e
302				)
303			})?;
304
305		response_rx.await.map_err(|e| {
306			format!(
307				"Trace cache task closed the response channel. Error : {:?}",
308				e
309			)
310		})
311	}
312
313	/// Fetch the traces for given block hash.
314	/// The task will answer only when it has processed this block.
315	/// The block should be part of a batch first. If no batch has requested the block it will
316	/// return an error.
317	#[instrument(skip(self))]
318	pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
319		let (response_tx, response_rx) = oneshot::channel();
320		let sender = self.0.clone();
321
322		sender
323			.unbounded_send(CacheRequest::GetTraces {
324				sender: response_tx,
325				block,
326			})
327			.map_err(|e| {
328				format!(
329					"Failed to send request to the trace cache task. Error : {:?}",
330					e
331				)
332			})?;
333
334		response_rx
335			.await
336			.map_err(|e| {
337				format!(
338					"Trace cache task closed the response channel. Error : {:?}",
339					e
340				)
341			})?
342			.map_err(|e| format!("Failed to replay block. Error : {:?}", e))
343	}
344
345	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
346	/// this batch and still in the waiting pool will be discarded.
347	#[instrument(skip(self))]
348	pub async fn stop_batch(&self, batch_id: CacheBatchId) {
349		let sender = self.0.clone();
350
351		// Here we don't care if the request has been accepted or refused, the caller can't
352		// do anything with it.
353		let _ = sender
354			.unbounded_send(CacheRequest::StopBatch { batch_id })
355			.map_err(|e| {
356				format!(
357					"Failed to send request to the trace cache task. Error : {:?}",
358					e
359				)
360			});
361	}
362}
363
364/// Data stored for each block in the cache.
365/// `active_batch_count` represents the number of batches using this
366/// block. It will increase immediately when a batch is created, but will be
367/// decrease only after the batch ends and its expiration delay passes.
368/// It allows to keep the data in the cache for following requests that would use
369/// this block, which is important to handle pagination efficiently.
370struct CacheBlock {
371	active_batch_count: usize,
372	state: CacheBlockState,
373}
374
375/// State of a cached block. It can either be polled to be traced or cached.
376enum CacheBlockState {
377	/// Block has been added to the pool blocks to be replayed.
378	/// It may be currently waiting to be replayed or being replayed.
379	Pooled {
380		started: bool,
381		/// Multiple requests might query the same block while it is pooled to be
382		/// traced. They response channel is stored here, and the result will be
383		/// sent in all of them when the tracing is finished.
384		waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
385		/// Channel used to unqueue a tracing that has not yet started.
386		/// A tracing will be unqueued if it has not yet been started and the last batch
387		/// needing this block is ended (ignoring the expiration delay).
388		/// It is not used directly, but dropping will wake up the receiver.
389		#[allow(dead_code)]
390		unqueue_sender: oneshot::Sender<()>,
391	},
392	/// Tracing has been completed and the result is available. No Runtime API call
393	/// will be needed until this block cache is removed.
394	Cached { traces: TxsTraceRes },
395}
396
397/// Tracing a block is done in a separate tokio blocking task to avoid clogging the async threads.
398/// For this reason a channel using this type is used by the blocking task to communicate with the
399/// main cache task.
400enum BlockingTaskMessage {
401	/// Notify the tracing for this block has started as the blocking task got a permit from
402	/// the semaphore. This is used to prevent the deletion of a cache entry for a block that has
403	/// started being traced.
404	Started { block_hash: H256 },
405	/// The tracing is finished and the result is sent to the main task.
406	Finished {
407		block_hash: H256,
408		result: TxsTraceRes,
409	},
410}
411
412/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
413pub struct CacheTask<B, C, BE> {
414	client: Arc<C>,
415	backend: Arc<BE>,
416	blocking_permits: Arc<Semaphore>,
417	cached_blocks: BTreeMap<H256, CacheBlock>,
418	batches: BTreeMap<u64, Vec<H256>>,
419	next_batch_id: u64,
420	metrics: Option<Metrics>,
421	_phantom: PhantomData<B>,
422}
423
424impl<B, C, BE> CacheTask<B, C, BE>
425where
426	BE: Backend<B> + 'static,
427	BE::State: StateBackend<BlakeTwo256>,
428	C: ProvideRuntimeApi<B>,
429	C: StorageProvider<B, BE>,
430	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
431	C: Send + Sync + 'static,
432	B: BlockT<Hash = H256> + Send + Sync + 'static,
433	B::Header: HeaderT<Number = u32>,
434	C::Api: BlockBuilder<B>,
435	C::Api: DebugRuntimeApi<B>,
436	C::Api: EthereumRuntimeRPCApi<B>,
437	C::Api: ApiExt<B>,
438{
439	/// Create a new cache task.
440	///
441	/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
442	/// send requests to the task.
443	pub fn create(
444		client: Arc<C>,
445		backend: Arc<BE>,
446		cache_duration: Duration,
447		blocking_permits: Arc<Semaphore>,
448		overrides: Arc<dyn StorageOverride<B>>,
449		prometheus: Option<PrometheusRegistry>,
450	) -> (impl Future<Output = ()>, CacheRequester) {
451		// Communication with the outside world :
452		let (requester_tx, mut requester_rx) =
453			sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
454
455		// Task running in the service.
456		let task = async move {
457			// The following variables are polled by the select! macro, and thus cannot be
458			// part of Self without introducing borrowing issues.
459			let mut batch_expirations = FuturesUnordered::new();
460			let (blocking_tx, mut blocking_rx) =
461				mpsc::channel(blocking_permits.available_permits() * 2);
462			let metrics = if let Some(registry) = prometheus {
463				match Metrics::register(&registry) {
464					Ok(metrics) => Some(metrics),
465					Err(err) => {
466						log::error!(target: "tracing", "Failed to register metrics {err:?}");
467						None
468					}
469				}
470			} else {
471				None
472			};
473			// Contains the inner state of the cache task, excluding the pooled futures/channels.
474			// Having this object allows to refactor each event into its own function, simplifying
475			// the main loop.
476			let mut inner = Self {
477				client,
478				backend,
479				blocking_permits,
480				cached_blocks: BTreeMap::new(),
481				batches: BTreeMap::new(),
482				next_batch_id: 0,
483				metrics,
484				_phantom: Default::default(),
485			};
486
487			// Main event loop. This loop must not contain any direct .await, as we want to
488			// react to events as fast as possible.
489			loop {
490				select! {
491					request = requester_rx.next() => {
492						match request {
493							None => break,
494							Some(CacheRequest::StartBatch {sender, blocks})
495								=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
496							Some(CacheRequest::GetTraces {sender, block})
497								=> inner.request_get_traces(sender, block),
498							Some(CacheRequest::StopBatch {batch_id}) => {
499								// Cannot be refactored inside `request_stop_batch` because
500								// it has an unnamable type :C
501								batch_expirations.push(async move {
502									sleep(cache_duration).await;
503									batch_id
504								});
505
506								inner.request_stop_batch(batch_id);
507							},
508						}
509					},
510					message = blocking_rx.recv().fuse() => {
511						match message {
512							None => (),
513							Some(BlockingTaskMessage::Started { block_hash })
514								=> inner.blocking_started(block_hash),
515							Some(BlockingTaskMessage::Finished { block_hash, result })
516								=> inner.blocking_finished(block_hash, result),
517						}
518					},
519					batch_id = batch_expirations.next() => {
520						match batch_id {
521							None => (),
522							Some(batch_id) => inner.expired_batch(batch_id),
523						}
524					}
525				}
526			}
527		}
528		.instrument(tracing::debug_span!("trace_filter_cache"));
529
530		(task, CacheRequester(requester_tx))
531	}
532
533	/// Handle the creation of a batch.
534	/// Will start the tracing process for blocks that are not already in the cache.
535	#[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
536	fn request_start_batch(
537		&mut self,
538		blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
539		sender: oneshot::Sender<CacheBatchId>,
540		blocks: Vec<H256>,
541		overrides: Arc<dyn StorageOverride<B>>,
542	) {
543		tracing::trace!("Starting batch {}", self.next_batch_id);
544		self.batches.insert(self.next_batch_id, blocks.clone());
545
546		for block in blocks {
547			// The block is already in the cache, awesome!
548			if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
549				block_cache.active_batch_count += 1;
550				tracing::trace!(
551					"Cache hit for block {}, now used by {} batches.",
552					block,
553					block_cache.active_batch_count
554				);
555			}
556			// Otherwise we need to queue this block for tracing.
557			else {
558				tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
559
560				let blocking_permits = Arc::clone(&self.blocking_permits);
561				let (unqueue_sender, unqueue_receiver) = oneshot::channel();
562				let client = Arc::clone(&self.client);
563				let backend = Arc::clone(&self.backend);
564				let blocking_tx = blocking_tx.clone();
565				let overrides = overrides.clone();
566
567				// Spawn all block caching asynchronously.
568				// It will wait to obtain a permit, then spawn a blocking task.
569				// When the blocking task returns its result, it is sent
570				// thought a channel to the main task loop.
571				tokio::spawn(
572					async move {
573						tracing::trace!("Waiting for blocking permit or task cancellation");
574						let _permit = select!(
575							_ = unqueue_receiver.fuse() => {
576							tracing::trace!("Tracing of the block has been cancelled.");
577								return;
578							},
579							permit = blocking_permits.acquire().fuse() => permit,
580						);
581
582						// Warn the main task that block tracing as started, and
583						// this block cache entry should not be removed.
584						let _ = blocking_tx
585							.send(BlockingTaskMessage::Started { block_hash: block })
586							.await;
587
588						tracing::trace!("Start block tracing in a blocking task.");
589
590						// Perform block tracing in a tokio blocking task.
591						let result = async {
592							tokio::task::spawn_blocking(move || {
593								Self::cache_block(client, backend, block, overrides.clone())
594							})
595							.await
596							.map_err(|e| {
597								format!("Tracing Substrate block {} panicked : {:?}", block, e)
598							})?
599						}
600						.await
601						.map_err(|e| e.to_string());
602
603						tracing::trace!("Block tracing finished, sending result to main task.");
604
605						// Send a response to the main task.
606						let _ = blocking_tx
607							.send(BlockingTaskMessage::Finished {
608								block_hash: block,
609								result,
610							})
611							.await;
612					}
613					.instrument(tracing::trace_span!("Block tracing", block = %block)),
614				);
615
616				// Insert the block in the cache.
617				self.cached_blocks.insert(
618					block,
619					CacheBlock {
620						active_batch_count: 1,
621						state: CacheBlockState::Pooled {
622							started: false,
623							waiting_requests: vec![],
624							unqueue_sender,
625						},
626					},
627				);
628			}
629		}
630
631		// Respond with the batch ID.
632		let _ = sender.send(CacheBatchId(self.next_batch_id));
633
634		// Increase batch ID for the next request.
635		self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
636	}
637
638	/// Handle a request to get the traces of the provided block.
639	/// - If the result is stored in the cache, it sends it immediately.
640	/// - If the block is currently being pooled, it is added to this block cache waiting list,
641	///   and all requests concerning this block will be satisfied when the tracing for this block
642	///   is finished.
643	/// - If this block is missing from the cache, it means no batch asked for it. All requested
644	///   blocks should be contained in a batch beforehand, and thus an error is returned.
645	#[instrument(skip(self))]
646	fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
647		if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
648			match &mut block_cache.state {
649				CacheBlockState::Pooled {
650					ref mut waiting_requests,
651					..
652				} => {
653					tracing::warn!(
654						"A request asked a pooled block ({}), adding it to the list of \
655						waiting requests.",
656						block
657					);
658					waiting_requests.push(sender);
659					if let Some(metrics) = &self.metrics {
660						metrics.tracing_cache_misses.inc();
661					}
662				}
663				CacheBlockState::Cached { traces, .. } => {
664					tracing::warn!(
665						"A request asked a cached block ({}), sending the traces directly.",
666						block
667					);
668					let _ = sender.send(traces.clone());
669					if let Some(metrics) = &self.metrics {
670						metrics.tracing_cache_hits.inc();
671					}
672				}
673			}
674		} else {
675			tracing::warn!(
676				"An RPC request asked to get a block ({}) which was not batched.",
677				block
678			);
679			let _ = sender.send(Err(format!(
680				"RPC request asked a block ({}) that was not batched",
681				block
682			)));
683		}
684	}
685
686	/// Handle a request to stop a batch.
687	/// For all blocks that needed to be traced, are only in this batch and not yet started, their
688	/// tracing is cancelled to save CPU-time and avoid attacks requesting large amount of blocks.
689	/// This batch data is not yet removed however. Instead a expiration delay timer is started
690	/// after which the data will indeed be cleared. (the code for that is in the main loop code
691	/// as it involved an unnamable type :C)
692	#[instrument(skip(self))]
693	fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
694		tracing::trace!("Stopping batch {}", batch_id.0);
695		if let Some(blocks) = self.batches.get(&batch_id.0) {
696			for block in blocks {
697				let mut remove = false;
698
699				// We remove early the block cache if this batch is the last
700				// pooling this block.
701				if let Some(block_cache) = self.cached_blocks.get_mut(block) {
702					if block_cache.active_batch_count == 1
703						&& matches!(
704							block_cache.state,
705							CacheBlockState::Pooled { started: false, .. }
706						) {
707						remove = true;
708					}
709				}
710
711				if remove {
712					tracing::trace!("Pooled block {} is no longer requested.", block);
713					// Remove block from the cache. Drops the value,
714					// closing all the channels contained in it.
715					let _ = self.cached_blocks.remove(block);
716				}
717			}
718		}
719	}
720
721	/// A tracing blocking task notifies it got a permit and is starting the tracing.
722	/// This started status is stored to avoid removing this block entry.
723	#[instrument(skip(self))]
724	fn blocking_started(&mut self, block_hash: H256) {
725		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
726			if let CacheBlockState::Pooled {
727				ref mut started, ..
728			} = block_cache.state
729			{
730				*started = true;
731			}
732		}
733	}
734
735	/// A tracing blocking task notifies it has finished the tracing and provide the result.
736	#[instrument(skip(self, result))]
737	fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
738		// In some cases it might be possible to receive traces of a block
739		// that has no entry in the cache because it was removed of the pool
740		// and received a permit concurrently. We just ignore it.
741		//
742		// TODO : Should we add it back ? Should it have an active_batch_count
743		// of 1 then ?
744		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
745			if let CacheBlockState::Pooled {
746				ref mut waiting_requests,
747				..
748			} = block_cache.state
749			{
750				tracing::trace!(
751					"A new block ({}) has been traced, adding it to the cache and responding to \
752					{} waiting requests.",
753					block_hash,
754					waiting_requests.len()
755				);
756				// Send result in waiting channels
757				while let Some(channel) = waiting_requests.pop() {
758					let _ = channel.send(result.clone());
759				}
760
761				// Update cache entry
762				block_cache.state = CacheBlockState::Cached { traces: result };
763			}
764		}
765	}
766
767	/// A batch expiration delay timer has completed. It performs the cache cleaning for blocks
768	/// not longer used by other batches.
769	#[instrument(skip(self))]
770	fn expired_batch(&mut self, batch_id: CacheBatchId) {
771		if let Some(batch) = self.batches.remove(&batch_id.0) {
772			for block in batch {
773				// For each block of the batch, we remove it if it was the
774				// last batch containing it.
775				let mut remove = false;
776				if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
777					block_cache.active_batch_count -= 1;
778
779					if block_cache.active_batch_count == 0 {
780						remove = true;
781					}
782				}
783
784				if remove {
785					let _ = self.cached_blocks.remove(&block);
786				}
787			}
788		}
789	}
790
791	/// (In blocking task) Use the Runtime API to trace the block.
792	#[instrument(skip(client, backend, overrides))]
793	fn cache_block(
794		client: Arc<C>,
795		backend: Arc<BE>,
796		substrate_hash: H256,
797		overrides: Arc<dyn StorageOverride<B>>,
798	) -> TxsTraceRes {
799		// Get Substrate block data.
800		let api = client.runtime_api();
801		let block_header = client
802			.header(substrate_hash)
803			.map_err(|e| {
804				format!(
805					"Error when fetching substrate block {} header : {:?}",
806					substrate_hash, e
807				)
808			})?
809			.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
810
811		let height = *block_header.number();
812		let substrate_parent_hash = *block_header.parent_hash();
813
814		// Get Ethereum block data.
815		let (eth_block, eth_transactions) = match (
816			overrides.current_block(substrate_hash),
817			overrides.current_transaction_statuses(substrate_hash),
818		) {
819			(Some(a), Some(b)) => (a, b),
820			_ => {
821				return Err(format!(
822					"Failed to get Ethereum block data for Substrate block {}",
823					substrate_hash
824				))
825			}
826		};
827
828		let eth_block_hash = eth_block.header.hash();
829		let eth_tx_hashes = eth_transactions
830			.iter()
831			.map(|t| t.transaction_hash)
832			.collect();
833
834		// Get extrinsics (containing Ethereum ones)
835		let extrinsics = backend
836			.blockchain()
837			.body(substrate_hash)
838			.map_err(|e| {
839				format!(
840					"Blockchain error when fetching extrinsics of block {} : {:?}",
841					height, e
842				)
843			})?
844			.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
845
846		// Get DebugRuntimeApi version
847		let trace_api_version = if let Ok(Some(api_version)) =
848			api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
849		{
850			api_version
851		} else {
852			return Err("Runtime api version call failed (trace)".to_string());
853		};
854
855		// Trace the block.
856		let f = || -> Result<_, String> {
857			let result = if trace_api_version >= 5 {
858				api.trace_block(
859					substrate_parent_hash,
860					extrinsics,
861					eth_tx_hashes,
862					&block_header,
863				)
864			} else {
865				// Get core runtime api version
866				let core_api_version = if let Ok(Some(api_version)) =
867					api.api_version::<dyn Core<B>>(substrate_parent_hash)
868				{
869					api_version
870				} else {
871					return Err("Runtime api version call failed (core)".to_string());
872				};
873
874				// Initialize block: calls the "on_initialize" hook on every pallet
875				// in AllPalletsWithSystem
876				// This was fine before pallet-message-queue because the XCM messages
877				// were processed by the "setValidationData" inherent call and not on an
878				// "on_initialize" hook, which runs before enabling XCM tracing
879				if core_api_version >= 5 {
880					api.initialize_block(substrate_parent_hash, &block_header)
881						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
882				} else {
883					#[allow(deprecated)]
884					api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
885						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
886				}
887
888				#[allow(deprecated)]
889				api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
890			};
891
892			result
893				.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
894				.map_err(|e| {
895					tracing::warn!(
896						target: "tracing",
897						"Internal runtime error when replaying block {} : {:?}",
898						height,
899						e
900					);
901					format!(
902						"Internal runtime error when replaying block {} : {:?}",
903						height, e
904					)
905				})?;
906
907			Ok(moonbeam_rpc_primitives_debug::Response::Block)
908		};
909
910		let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
911			.iter()
912			.map(|t| (t.transaction_index, t.transaction_hash))
913			.collect();
914
915		let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
916		proxy.using(f)?;
917
918		let traces: Vec<TransactionTrace> =
919			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
920				.ok_or("Fail to format proxy")?
921				.into_iter()
922				.filter_map(|mut trace| {
923					match eth_transactions_by_index.get(&trace.transaction_position) {
924						Some(transaction_hash) => {
925							trace.block_hash = eth_block_hash;
926							trace.block_number = height;
927							trace.transaction_hash = *transaction_hash;
928
929							// Reformat error messages.
930							if let block::TransactionTraceOutput::Error(ref mut error) =
931								trace.output
932							{
933								if error.as_slice() == b"execution reverted" {
934									*error = b"Reverted".to_vec();
935								}
936							}
937
938							Some(trace)
939						}
940						None => {
941							log::warn!(
942								target: "tracing",
943								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
944								height,
945								trace,
946							);
947							None
948						}
949					}
950				})
951				.collect();
952
953		Ok(traces)
954	}
955}
956
957/// Prometheus metrics for tracing.
958#[derive(Clone)]
959pub(crate) struct Metrics {
960	tracing_cache_hits: Counter<U64>,
961	tracing_cache_misses: Counter<U64>,
962}
963
964impl Metrics {
965	pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
966		Ok(Self {
967			tracing_cache_hits: register(
968				Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
969				registry,
970			)?,
971			tracing_cache_misses: register(
972				Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
973				registry,
974			)?,
975		})
976	}
977}