use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, oneshot, Semaphore},
time::sleep,
};
use tracing::{instrument, Instrument};
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::{
Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
};
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
use substrate_prometheus_endpoint::{
register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
};
use ethereum_types::H256;
use fc_storage::StorageOverride;
use fp_rpc::EthereumRuntimeRPCApi;
use moonbeam_client_evm_tracing::{
formatters::ResponseFormatter,
types::block::{self, TransactionTrace},
};
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
pub struct Trace<B, C> {
_phantom: PhantomData<B>,
client: Arc<C>,
requester: CacheRequester,
max_count: u32,
}
impl<B, C> Clone for Trace<B, C> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData,
client: Arc::clone(&self.client),
requester: self.requester.clone(),
max_count: self.max_count,
}
}
}
impl<B, C> Trace<B, C>
where
B: BlockT<Hash = H256> + Send + Sync + 'static,
B::Header: HeaderT<Number = u32>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
C: Send + Sync + 'static,
{
pub fn new(client: Arc<C>, requester: CacheRequester, max_count: u32) -> Self {
Self {
client,
requester,
max_count,
_phantom: PhantomData,
}
}
fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
match id {
Some(RequestBlockId::Number(n)) => Ok(n),
None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
Ok(self.client.info().best_number)
}
Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
Err("'pending' is not supported")
}
Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
}
}
async fn filter(self, req: FilterRequest) -> TxsTraceRes {
let from_block = self.block_id(req.from_block)?;
let to_block = self.block_id(req.to_block)?;
let block_heights = from_block..=to_block;
let count = req.count.unwrap_or(self.max_count);
if count > self.max_count {
return Err(format!(
"count ({}) can't be greater than maximum ({})",
count, self.max_count
));
}
let mut block_hashes = vec![];
for block_height in block_heights {
if block_height == 0 {
continue; }
let block_hash = self
.client
.hash(block_height)
.map_err(|e| {
format!(
"Error when fetching block {} header : {:?}",
block_height, e
)
})?
.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
block_hashes.push(block_hash);
}
let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
let res = self.fetch_traces(req, &block_hashes, count as usize).await;
self.requester.stop_batch(batch_id).await;
res
}
async fn fetch_traces(
&self,
req: FilterRequest,
block_hashes: &[H256],
count: usize,
) -> TxsTraceRes {
let from_address = req.from_address.unwrap_or_default();
let to_address = req.to_address.unwrap_or_default();
let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
let mut traces = vec![];
for &block_hash in block_hashes {
let block_traces = self.requester.get_traces(block_hash).await?;
let mut block_traces: Vec<_> = block_traces
.iter()
.filter(|trace| match trace.action {
block::TransactionTraceAction::Call { from, to, .. } => {
(from_address.is_empty() || from_address.contains(&from))
&& (to_address.is_empty() || to_address.contains(&to))
}
block::TransactionTraceAction::Create { from, .. } => {
(from_address.is_empty() || from_address.contains(&from))
&& to_address.is_empty()
}
block::TransactionTraceAction::Suicide { address, .. } => {
(from_address.is_empty() || from_address.contains(&address))
&& to_address.is_empty()
}
})
.cloned()
.collect();
traces_amount += block_traces.len() as i64;
if traces_amount > 0 {
let traces_amount = traces_amount as usize;
if traces_amount < block_traces.len() {
let skip = block_traces.len() - traces_amount;
block_traces = block_traces.into_iter().skip(skip).collect();
}
traces.append(&mut block_traces);
if traces_amount >= count {
if req.count.is_none() {
return Err(format!(
"the amount of traces goes over the maximum ({}), please use 'after' \
and 'count' in your request",
self.max_count
));
}
traces = traces.into_iter().take(count).collect();
break;
}
}
}
Ok(traces)
}
}
#[jsonrpsee::core::async_trait]
impl<B, C> TraceServer for Trace<B, C>
where
B: BlockT<Hash = H256> + Send + Sync + 'static,
B::Header: HeaderT<Number = u32>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
C: Send + Sync + 'static,
{
async fn filter(
&self,
filter: FilterRequest,
) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
self.clone()
.filter(filter)
.await
.map_err(|e| fc_rpc::internal_err(e))
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct CacheBatchId(u64);
enum CacheRequest {
StartBatch {
sender: oneshot::Sender<CacheBatchId>,
blocks: Vec<H256>,
},
GetTraces {
sender: oneshot::Sender<TxsTraceRes>,
block: H256,
},
StopBatch { batch_id: CacheBatchId },
}
#[derive(Clone)]
pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
impl CacheRequester {
#[instrument(skip(self))]
pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
let (response_tx, response_rx) = oneshot::channel();
let sender = self.0.clone();
sender
.unbounded_send(CacheRequest::StartBatch {
sender: response_tx,
blocks,
})
.map_err(|e| {
format!(
"Failed to send request to the trace cache task. Error : {:?}",
e
)
})?;
response_rx.await.map_err(|e| {
format!(
"Trace cache task closed the response channel. Error : {:?}",
e
)
})
}
#[instrument(skip(self))]
pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
let (response_tx, response_rx) = oneshot::channel();
let sender = self.0.clone();
sender
.unbounded_send(CacheRequest::GetTraces {
sender: response_tx,
block,
})
.map_err(|e| {
format!(
"Failed to send request to the trace cache task. Error : {:?}",
e
)
})?;
response_rx
.await
.map_err(|e| {
format!(
"Trace cache task closed the response channel. Error : {:?}",
e
)
})?
.map_err(|e| format!("Failed to replay block. Error : {:?}", e))
}
#[instrument(skip(self))]
pub async fn stop_batch(&self, batch_id: CacheBatchId) {
let sender = self.0.clone();
let _ = sender
.unbounded_send(CacheRequest::StopBatch { batch_id })
.map_err(|e| {
format!(
"Failed to send request to the trace cache task. Error : {:?}",
e
)
});
}
}
struct CacheBlock {
active_batch_count: usize,
state: CacheBlockState,
}
enum CacheBlockState {
Pooled {
started: bool,
waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
#[allow(dead_code)]
unqueue_sender: oneshot::Sender<()>,
},
Cached { traces: TxsTraceRes },
}
enum BlockingTaskMessage {
Started { block_hash: H256 },
Finished {
block_hash: H256,
result: TxsTraceRes,
},
}
pub struct CacheTask<B, C, BE> {
client: Arc<C>,
backend: Arc<BE>,
blocking_permits: Arc<Semaphore>,
cached_blocks: BTreeMap<H256, CacheBlock>,
batches: BTreeMap<u64, Vec<H256>>,
next_batch_id: u64,
metrics: Option<Metrics>,
_phantom: PhantomData<B>,
}
impl<B, C, BE> CacheTask<B, C, BE>
where
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
C: ProvideRuntimeApi<B>,
C: StorageProvider<B, BE>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
C: Send + Sync + 'static,
B: BlockT<Hash = H256> + Send + Sync + 'static,
B::Header: HeaderT<Number = u32>,
C::Api: BlockBuilder<B>,
C::Api: DebugRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C::Api: ApiExt<B>,
{
pub fn create(
client: Arc<C>,
backend: Arc<BE>,
cache_duration: Duration,
blocking_permits: Arc<Semaphore>,
overrides: Arc<dyn StorageOverride<B>>,
prometheus: Option<PrometheusRegistry>,
) -> (impl Future<Output = ()>, CacheRequester) {
let (requester_tx, mut requester_rx) =
sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
let task = async move {
let mut batch_expirations = FuturesUnordered::new();
let (blocking_tx, mut blocking_rx) =
mpsc::channel(blocking_permits.available_permits() * 2);
let metrics = if let Some(registry) = prometheus {
match Metrics::register(®istry) {
Ok(metrics) => Some(metrics),
Err(err) => {
log::error!(target: "tracing", "Failed to register metrics {err:?}");
None
}
}
} else {
None
};
let mut inner = Self {
client,
backend,
blocking_permits,
cached_blocks: BTreeMap::new(),
batches: BTreeMap::new(),
next_batch_id: 0,
metrics,
_phantom: Default::default(),
};
loop {
select! {
request = requester_rx.next() => {
match request {
None => break,
Some(CacheRequest::StartBatch {sender, blocks})
=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
Some(CacheRequest::GetTraces {sender, block})
=> inner.request_get_traces(sender, block),
Some(CacheRequest::StopBatch {batch_id}) => {
batch_expirations.push(async move {
sleep(cache_duration).await;
batch_id
});
inner.request_stop_batch(batch_id);
},
}
},
message = blocking_rx.recv().fuse() => {
match message {
None => (),
Some(BlockingTaskMessage::Started { block_hash })
=> inner.blocking_started(block_hash),
Some(BlockingTaskMessage::Finished { block_hash, result })
=> inner.blocking_finished(block_hash, result),
}
},
batch_id = batch_expirations.next() => {
match batch_id {
None => (),
Some(batch_id) => inner.expired_batch(batch_id),
}
}
}
}
}
.instrument(tracing::debug_span!("trace_filter_cache"));
(task, CacheRequester(requester_tx))
}
#[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
fn request_start_batch(
&mut self,
blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
sender: oneshot::Sender<CacheBatchId>,
blocks: Vec<H256>,
overrides: Arc<dyn StorageOverride<B>>,
) {
tracing::trace!("Starting batch {}", self.next_batch_id);
self.batches.insert(self.next_batch_id, blocks.clone());
for block in blocks {
if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
block_cache.active_batch_count += 1;
tracing::trace!(
"Cache hit for block {}, now used by {} batches.",
block,
block_cache.active_batch_count
);
}
else {
tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
let blocking_permits = Arc::clone(&self.blocking_permits);
let (unqueue_sender, unqueue_receiver) = oneshot::channel();
let client = Arc::clone(&self.client);
let backend = Arc::clone(&self.backend);
let blocking_tx = blocking_tx.clone();
let overrides = overrides.clone();
tokio::spawn(
async move {
tracing::trace!("Waiting for blocking permit or task cancellation");
let _permit = select!(
_ = unqueue_receiver.fuse() => {
tracing::trace!("Tracing of the block has been cancelled.");
return;
},
permit = blocking_permits.acquire().fuse() => permit,
);
let _ = blocking_tx
.send(BlockingTaskMessage::Started { block_hash: block })
.await;
tracing::trace!("Start block tracing in a blocking task.");
let result = async {
tokio::task::spawn_blocking(move || {
Self::cache_block(client, backend, block, overrides.clone())
})
.await
.map_err(|e| {
format!("Tracing Substrate block {} panicked : {:?}", block, e)
})?
}
.await
.map_err(|e| e.to_string());
tracing::trace!("Block tracing finished, sending result to main task.");
let _ = blocking_tx
.send(BlockingTaskMessage::Finished {
block_hash: block,
result,
})
.await;
}
.instrument(tracing::trace_span!("Block tracing", block = %block)),
);
self.cached_blocks.insert(
block,
CacheBlock {
active_batch_count: 1,
state: CacheBlockState::Pooled {
started: false,
waiting_requests: vec![],
unqueue_sender,
},
},
);
}
}
let _ = sender.send(CacheBatchId(self.next_batch_id));
self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
}
#[instrument(skip(self))]
fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
match &mut block_cache.state {
CacheBlockState::Pooled {
ref mut waiting_requests,
..
} => {
tracing::warn!(
"A request asked a pooled block ({}), adding it to the list of \
waiting requests.",
block
);
waiting_requests.push(sender);
if let Some(metrics) = &self.metrics {
metrics.tracing_cache_misses.inc();
}
}
CacheBlockState::Cached { traces, .. } => {
tracing::warn!(
"A request asked a cached block ({}), sending the traces directly.",
block
);
let _ = sender.send(traces.clone());
if let Some(metrics) = &self.metrics {
metrics.tracing_cache_hits.inc();
}
}
}
} else {
tracing::warn!(
"An RPC request asked to get a block ({}) which was not batched.",
block
);
let _ = sender.send(Err(format!(
"RPC request asked a block ({}) that was not batched",
block
)));
}
}
#[instrument(skip(self))]
fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
tracing::trace!("Stopping batch {}", batch_id.0);
if let Some(blocks) = self.batches.get(&batch_id.0) {
for block in blocks {
let mut remove = false;
if let Some(block_cache) = self.cached_blocks.get_mut(block) {
if block_cache.active_batch_count == 1
&& matches!(
block_cache.state,
CacheBlockState::Pooled { started: false, .. }
) {
remove = true;
}
}
if remove {
tracing::trace!("Pooled block {} is no longer requested.", block);
let _ = self.cached_blocks.remove(&block);
}
}
}
}
#[instrument(skip(self))]
fn blocking_started(&mut self, block_hash: H256) {
if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
if let CacheBlockState::Pooled {
ref mut started, ..
} = block_cache.state
{
*started = true;
}
}
}
#[instrument(skip(self, result))]
fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
if let CacheBlockState::Pooled {
ref mut waiting_requests,
..
} = block_cache.state
{
tracing::trace!(
"A new block ({}) has been traced, adding it to the cache and responding to \
{} waiting requests.",
block_hash,
waiting_requests.len()
);
while let Some(channel) = waiting_requests.pop() {
let _ = channel.send(result.clone());
}
block_cache.state = CacheBlockState::Cached { traces: result };
}
}
}
#[instrument(skip(self))]
fn expired_batch(&mut self, batch_id: CacheBatchId) {
if let Some(batch) = self.batches.remove(&batch_id.0) {
for block in batch {
let mut remove = false;
if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
block_cache.active_batch_count -= 1;
if block_cache.active_batch_count == 0 {
remove = true;
}
}
if remove {
let _ = self.cached_blocks.remove(&block);
}
}
}
}
#[instrument(skip(client, backend, overrides))]
fn cache_block(
client: Arc<C>,
backend: Arc<BE>,
substrate_hash: H256,
overrides: Arc<dyn StorageOverride<B>>,
) -> TxsTraceRes {
let api = client.runtime_api();
let block_header = client
.header(substrate_hash)
.map_err(|e| {
format!(
"Error when fetching substrate block {} header : {:?}",
substrate_hash, e
)
})?
.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
let height = *block_header.number();
let substrate_parent_hash = *block_header.parent_hash();
let (eth_block, eth_transactions) = match (
overrides.current_block(substrate_hash),
overrides.current_transaction_statuses(substrate_hash),
) {
(Some(a), Some(b)) => (a, b),
_ => {
return Err(format!(
"Failed to get Ethereum block data for Substrate block {}",
substrate_hash
))
}
};
let eth_block_hash = eth_block.header.hash();
let eth_tx_hashes = eth_transactions
.iter()
.map(|t| t.transaction_hash)
.collect();
let extrinsics = backend
.blockchain()
.body(substrate_hash)
.map_err(|e| {
format!(
"Blockchain error when fetching extrinsics of block {} : {:?}",
height, e
)
})?
.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
let trace_api_version = if let Ok(Some(api_version)) =
api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
{
api_version
} else {
return Err(format!("Runtime api version call failed (trace)"));
};
let f = || -> Result<_, String> {
let result = if trace_api_version >= 5 {
api.trace_block(
substrate_parent_hash,
extrinsics,
eth_tx_hashes,
&block_header,
)
} else {
let core_api_version = if let Ok(Some(api_version)) =
api.api_version::<dyn Core<B>>(substrate_parent_hash)
{
api_version
} else {
return Err("Runtime api version call failed (core)".to_string());
};
if core_api_version >= 5 {
api.initialize_block(substrate_parent_hash, &block_header)
.map_err(|e| format!("Runtime api access error: {:?}", e))?;
} else {
#[allow(deprecated)]
api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
.map_err(|e| format!("Runtime api access error: {:?}", e))?;
}
#[allow(deprecated)]
api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
};
result
.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
.map_err(|e| {
tracing::warn!(
target: "tracing",
"Internal runtime error when replaying block {} : {:?}",
height,
e
);
format!(
"Internal runtime error when replaying block {} : {:?}",
height, e
)
})?;
Ok(moonbeam_rpc_primitives_debug::Response::Block)
};
let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
.iter()
.map(|t| (t.transaction_index, t.transaction_hash))
.collect();
let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
proxy.using(f)?;
let traces: Vec<TransactionTrace> =
moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
.ok_or("Fail to format proxy")?
.into_iter()
.filter_map(|mut trace| {
match eth_transactions_by_index.get(&trace.transaction_position) {
Some(transaction_hash) => {
trace.block_hash = eth_block_hash;
trace.block_number = height;
trace.transaction_hash = *transaction_hash;
if let block::TransactionTraceOutput::Error(ref mut error) =
trace.output
{
if error.as_slice() == b"execution reverted" {
*error = b"Reverted".to_vec();
}
}
Some(trace)
}
None => {
log::warn!(
target: "tracing",
"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
height,
trace,
);
None
}
}
})
.collect();
Ok(traces)
}
}
#[derive(Clone)]
pub(crate) struct Metrics {
tracing_cache_hits: Counter<U64>,
tracing_cache_misses: Counter<U64>,
}
impl Metrics {
pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
Ok(Self {
tracing_cache_hits: register(
Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
registry,
)?,
tracing_cache_misses: register(
Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
registry,
)?,
})
}
}