moonbeam_service/
lib.rs

1// Copyright 2019-2025 PureStake Inc.
2// This file is part of Moonbeam.
3
4// Moonbeam is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Moonbeam is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16
17//! This module assembles the Moonbeam service components, executes them, and manages communication
18//! between them. This is the backbone of the client-side node implementation.
19//!
20//! This module can assemble:
21//! PartialComponents: For maintenance tasks without a complete node (eg import/export blocks, purge)
22//! Full Service: A complete parachain node including the pool, rpc, network, embedded relay chain
23//! Dev Service: A leaner service without the relay chain backing.
24
25pub mod rpc;
26
27use cumulus_client_cli::CollatorOptions;
28use cumulus_client_collator::service::CollatorService;
29use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30use cumulus_client_consensus_proposer::Proposer;
31use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
32use cumulus_client_service::{
33	prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
34	ParachainHostFunctions, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{
37	relay_chain::{self, well_known_keys, CollatorPair},
38	CollectCollationInfo, ParaId,
39};
40use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
41use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
42use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
43use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
44use fc_db::DatabaseSource;
45use fc_rpc::StorageOverrideHandler;
46use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
47use futures::{FutureExt, StreamExt};
48use maplit::hashmap;
49#[cfg(feature = "moonbase-native")]
50pub use moonbase_runtime;
51use moonbeam_cli_opt::{
52	AuthoringPolicy, EthApi as EthApiCmd, FrontierBackendConfig, NodeExtraArgs, RpcConfig,
53};
54#[cfg(feature = "moonbeam-native")]
55pub use moonbeam_runtime;
56use moonbeam_vrf::VrfDigestsProvider;
57#[cfg(feature = "moonriver-native")]
58pub use moonriver_runtime;
59use nimbus_consensus::collators::slot_based::SlotBasedBlockImportHandle;
60use nimbus_consensus::{
61	collators::slot_based::SlotBasedBlockImport, NimbusManualSealConsensusDataProvider,
62};
63use nimbus_primitives::{DigestsProvider, NimbusId};
64use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
65use sc_client_api::{
66	backend::{AuxStore, Backend, StateBackend, StorageProvider},
67	ExecutorProvider,
68};
69use sc_consensus::{BlockImport, ImportQueue};
70use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
71use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
72use sc_service::config::PrometheusConfig;
73use sc_service::{
74	error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
75	TFullClient, TaskManager,
76};
77use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
78use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
79use session_keys_primitives::VrfApi;
80use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
81use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
82use sp_consensus::SyncOracle;
83use sp_core::{ByteArray, Encode, H256};
84use sp_keystore::{Keystore, KeystorePtr};
85use std::str::FromStr;
86use std::sync::atomic::{AtomicU64, Ordering};
87use std::sync::Arc;
88use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
89use substrate_prometheus_endpoint::Registry;
90
91pub use client::*;
92pub mod chain_spec;
93mod client;
94#[cfg(feature = "lazy-loading")]
95pub mod lazy_loading;
96
97type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98type FullBackend = TFullBackend<Block>;
99
100type MaybeSelectChain<Backend> = Option<sc_consensus::LongestChain<Backend, Block>>;
101type FrontierBlockImport<Client> = TFrontierBlockImport<Block, Arc<Client>, Client>;
102type PartialComponentsResult<Client, Backend> = Result<
103	PartialComponents<
104		Client,
105		Backend,
106		MaybeSelectChain<Backend>,
107		sc_consensus::DefaultImportQueue<Block>,
108		sc_transaction_pool::TransactionPoolHandle<Block, Client>,
109		(
110			MoonbeamBlockImport<Client>,
111			Option<FilterPool>,
112			Option<Telemetry>,
113			Option<TelemetryWorkerHandle>,
114			Arc<fc_db::Backend<Block, Client>>,
115			FeeHistoryCache,
116		),
117	>,
118	ServiceError,
119>;
120
121const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
122
123static TIMESTAMP: AtomicU64 = AtomicU64::new(0);
124
125/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
126/// Each call will increment timestamp by slot_duration making Aura think time has passed.
127struct MockTimestampInherentDataProvider;
128
129impl MockTimestampInherentDataProvider {
130	fn advance_timestamp(slot_duration: u64) {
131		if TIMESTAMP.load(Ordering::SeqCst) == 0 {
132			// Initialize timestamp inherent provider
133			TIMESTAMP.store(
134				sp_timestamp::Timestamp::current().as_millis(),
135				Ordering::SeqCst,
136			);
137		} else {
138			TIMESTAMP.fetch_add(slot_duration, Ordering::SeqCst);
139		}
140	}
141}
142
143#[async_trait::async_trait]
144impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
145	async fn provide_inherent_data(
146		&self,
147		inherent_data: &mut sp_inherents::InherentData,
148	) -> Result<(), sp_inherents::Error> {
149		inherent_data.put_data(
150			sp_timestamp::INHERENT_IDENTIFIER,
151			&TIMESTAMP.load(Ordering::SeqCst),
152		)
153	}
154
155	async fn try_handle_error(
156		&self,
157		_identifier: &sp_inherents::InherentIdentifier,
158		_error: &[u8],
159	) -> Option<Result<(), sp_inherents::Error>> {
160		// The pallet never reports error.
161		None
162	}
163}
164
165#[cfg(feature = "runtime-benchmarks")]
166pub type HostFunctions = (
167	frame_benchmarking::benchmarking::HostFunctions,
168	ParachainHostFunctions,
169	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
170);
171#[cfg(not(feature = "runtime-benchmarks"))]
172pub type HostFunctions = (
173	ParachainHostFunctions,
174	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
175);
176
177pub enum MoonbeamBlockImport<Client> {
178	/// Used in dev mode to import new blocks as best blocks.
179	Dev(FrontierBlockImport<Client>),
180	/// Used in parachain mode with lookahead authoring policy.
181	ParachainLookahead(FrontierBlockImport<Client>),
182	/// Used in parachain mode with slot-based authoring policy.
183	ParachainSlotBased(
184		SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>,
185		SlotBasedBlockImportHandle<Block>,
186	),
187}
188
189impl<Client> MoonbeamBlockImport<Client>
190where
191	Client: Send + Sync + 'static,
192	Client: ProvideRuntimeApi<Block>,
193	Client::Api: sp_block_builder::BlockBuilder<Block> + fp_rpc::EthereumRuntimeRPCApi<Block>,
194	FrontierBlockImport<Client>: BlockImport<Block>,
195	SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>: BlockImport<Block, Error = <FrontierBlockImport<Client> as BlockImport<Block>>::Error>
196		+ 'static,
197{
198	fn new(
199		frontier_block_import: FrontierBlockImport<Client>,
200		client: Arc<Client>,
201		authoring_policy: AuthoringPolicy,
202		is_dev: bool,
203	) -> Self {
204		if is_dev {
205			MoonbeamBlockImport::Dev(frontier_block_import)
206		} else {
207			match authoring_policy {
208				AuthoringPolicy::Lookahead => {
209					MoonbeamBlockImport::ParachainLookahead(frontier_block_import)
210				}
211				AuthoringPolicy::SlotBased => {
212					let (block_import, block_import_auxiliary_data) =
213						SlotBasedBlockImport::new(frontier_block_import, client);
214					MoonbeamBlockImport::ParachainSlotBased(
215						block_import,
216						block_import_auxiliary_data,
217					)
218				}
219			}
220		}
221	}
222}
223
224/// A trait that must be implemented by all moon* runtimes executors.
225///
226/// This feature allows, for instance, to customize the client extensions according to the type
227/// of network.
228/// For the moment, this feature is only used to specify the first block compatible with
229/// ed25519-zebra, but it could be used for other things in the future.
230pub trait ClientCustomizations {
231	/// The host function ed25519_verify has changed its behavior in the substrate history,
232	/// because of the change from lib ed25519-dalek to lib ed25519-zebra.
233	/// Some networks may have old blocks that are not compatible with ed25519-zebra,
234	/// for these networks this function should return the 1st block compatible with the new lib.
235	/// If this function returns None (default behavior), it implies that all blocks are compatible
236	/// with the new lib (ed25519-zebra).
237	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
238		None
239	}
240}
241
242#[cfg(feature = "moonbeam-native")]
243pub struct MoonbeamCustomizations;
244#[cfg(feature = "moonbeam-native")]
245impl ClientCustomizations for MoonbeamCustomizations {
246	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
247		Some(2_000_000)
248	}
249}
250
251#[cfg(feature = "moonriver-native")]
252pub struct MoonriverCustomizations;
253#[cfg(feature = "moonriver-native")]
254impl ClientCustomizations for MoonriverCustomizations {
255	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
256		Some(3_000_000)
257	}
258}
259
260#[cfg(feature = "moonbase-native")]
261pub struct MoonbaseCustomizations;
262#[cfg(feature = "moonbase-native")]
263impl ClientCustomizations for MoonbaseCustomizations {
264	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
265		Some(3_000_000)
266	}
267}
268
269/// Trivial enum representing runtime variant
270#[derive(Clone)]
271pub enum RuntimeVariant {
272	#[cfg(feature = "moonbeam-native")]
273	Moonbeam,
274	#[cfg(feature = "moonriver-native")]
275	Moonriver,
276	#[cfg(feature = "moonbase-native")]
277	Moonbase,
278	Unrecognized,
279}
280
281impl RuntimeVariant {
282	pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
283		match chain_spec {
284			#[cfg(feature = "moonbeam-native")]
285			spec if spec.is_moonbeam() => Self::Moonbeam,
286			#[cfg(feature = "moonriver-native")]
287			spec if spec.is_moonriver() => Self::Moonriver,
288			#[cfg(feature = "moonbase-native")]
289			spec if spec.is_moonbase() => Self::Moonbase,
290			_ => Self::Unrecognized,
291		}
292	}
293}
294
295/// Can be called for a `Configuration` to check if it is a configuration for
296/// the `Moonbeam` network.
297pub trait IdentifyVariant {
298	/// Returns `true` if this is a configuration for the `Moonbase` network.
299	fn is_moonbase(&self) -> bool;
300
301	/// Returns `true` if this is a configuration for the `Moonbeam` network.
302	fn is_moonbeam(&self) -> bool;
303
304	/// Returns `true` if this is a configuration for the `Moonriver` network.
305	fn is_moonriver(&self) -> bool;
306
307	/// Returns `true` if this is a configuration for a dev network.
308	fn is_dev(&self) -> bool;
309}
310
311impl IdentifyVariant for Box<dyn ChainSpec> {
312	fn is_moonbase(&self) -> bool {
313		self.id().starts_with("moonbase")
314	}
315
316	fn is_moonbeam(&self) -> bool {
317		self.id().starts_with("moonbeam")
318	}
319
320	fn is_moonriver(&self) -> bool {
321		self.id().starts_with("moonriver")
322	}
323
324	fn is_dev(&self) -> bool {
325		self.chain_type() == sc_chain_spec::ChainType::Development
326	}
327}
328
329pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
330	config
331		.base_path
332		.config_dir(config.chain_spec.id())
333		.join("frontier")
334		.join(path)
335}
336
337// TODO This is copied from frontier. It should be imported instead after
338// https://github.com/paritytech/frontier/issues/333 is solved
339pub fn open_frontier_backend<C, BE>(
340	client: Arc<C>,
341	config: &Configuration,
342	rpc_config: &RpcConfig,
343) -> Result<fc_db::Backend<Block, C>, String>
344where
345	C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
346	C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
347	C: Send + Sync + 'static,
348	C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
349	BE: Backend<Block> + 'static,
350	BE::State: StateBackend<BlakeTwo256>,
351{
352	let frontier_backend = match rpc_config.frontier_backend_config {
353		FrontierBackendConfig::KeyValue => {
354			fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
355				client,
356				&fc_db::kv::DatabaseSettings {
357					source: match config.database {
358						DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
359							path: frontier_database_dir(config, "db"),
360							cache_size: 0,
361						},
362						DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
363							path: frontier_database_dir(config, "paritydb"),
364						},
365						DatabaseSource::Auto { .. } => DatabaseSource::Auto {
366							rocksdb_path: frontier_database_dir(config, "db"),
367							paritydb_path: frontier_database_dir(config, "paritydb"),
368							cache_size: 0,
369						},
370						_ => {
371							return Err(
372								"Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
373							)
374						}
375					},
376				},
377			)?))
378		}
379		FrontierBackendConfig::Sql {
380			pool_size,
381			num_ops_timeout,
382			thread_count,
383			cache_size,
384		} => {
385			let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
386			let sqlite_db_path = frontier_database_dir(config, "sql");
387			std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
388			let backend = futures::executor::block_on(fc_db::sql::Backend::new(
389				fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
390					path: Path::new("sqlite:///")
391						.join(sqlite_db_path)
392						.join("frontier.db3")
393						.to_str()
394						.expect("frontier sql path error"),
395					create_if_missing: true,
396					thread_count: thread_count,
397					cache_size: cache_size,
398				}),
399				pool_size,
400				std::num::NonZeroU32::new(num_ops_timeout),
401				overrides.clone(),
402			))
403			.unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
404			fc_db::Backend::Sql(Arc::new(backend))
405		}
406	};
407
408	Ok(frontier_backend)
409}
410
411use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
412
413pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
414
415/// Builds a new object suitable for chain operations.
416#[allow(clippy::type_complexity)]
417pub fn new_chain_ops(
418	config: &mut Configuration,
419	rpc_config: &RpcConfig,
420	node_extra_args: NodeExtraArgs,
421) -> Result<
422	(
423		Arc<Client>,
424		Arc<FullBackend>,
425		sc_consensus::BasicQueue<Block>,
426		TaskManager,
427	),
428	ServiceError,
429> {
430	match &config.chain_spec {
431		#[cfg(feature = "moonriver-native")]
432		spec if spec.is_moonriver() => new_chain_ops_inner::<
433			moonriver_runtime::RuntimeApi,
434			MoonriverCustomizations,
435		>(config, rpc_config, node_extra_args),
436		#[cfg(feature = "moonbeam-native")]
437		spec if spec.is_moonbeam() => new_chain_ops_inner::<
438			moonbeam_runtime::RuntimeApi,
439			MoonbeamCustomizations,
440		>(config, rpc_config, node_extra_args),
441		#[cfg(feature = "moonbase-native")]
442		_ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
443			config,
444			rpc_config,
445			node_extra_args,
446		),
447		#[cfg(not(feature = "moonbase-native"))]
448		_ => panic!("invalid chain spec"),
449	}
450}
451
452#[allow(clippy::type_complexity)]
453fn new_chain_ops_inner<RuntimeApi, Customizations>(
454	config: &mut Configuration,
455	rpc_config: &RpcConfig,
456	node_extra_args: NodeExtraArgs,
457) -> Result<
458	(
459		Arc<Client>,
460		Arc<FullBackend>,
461		sc_consensus::BasicQueue<Block>,
462		TaskManager,
463	),
464	ServiceError,
465>
466where
467	Client: From<Arc<crate::FullClient<RuntimeApi>>>,
468	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
469	RuntimeApi::RuntimeApi: RuntimeApiCollection,
470	Customizations: ClientCustomizations + 'static,
471{
472	config.keystore = sc_service::config::KeystoreConfig::InMemory;
473	let PartialComponents {
474		client,
475		backend,
476		import_queue,
477		task_manager,
478		..
479	} = new_partial::<RuntimeApi, Customizations>(config, rpc_config, node_extra_args)?;
480	Ok((
481		Arc::new(Client::from(client)),
482		backend,
483		import_queue,
484		task_manager,
485	))
486}
487
488// If we're using prometheus, use a registry with a prefix of `moonbeam`.
489fn set_prometheus_registry(
490	config: &mut Configuration,
491	skip_prefix: bool,
492) -> Result<(), ServiceError> {
493	if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
494		let labels = hashmap! {
495			"chain".into() => config.chain_spec.id().into(),
496		};
497		let prefix = if skip_prefix {
498			None
499		} else {
500			Some("moonbeam".into())
501		};
502
503		*registry = Registry::new_custom(prefix, Some(labels))?;
504	}
505
506	Ok(())
507}
508
509/// Builds the PartialComponents for a parachain or development service
510///
511/// Use this function if you don't actually need the full service, but just the partial in order to
512/// be able to perform chain operations.
513#[allow(clippy::type_complexity)]
514pub fn new_partial<RuntimeApi, Customizations>(
515	config: &mut Configuration,
516	rpc_config: &RpcConfig,
517	node_extra_args: NodeExtraArgs,
518) -> PartialComponentsResult<FullClient<RuntimeApi>, FullBackend>
519where
520	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
521	RuntimeApi::RuntimeApi: RuntimeApiCollection,
522	Customizations: ClientCustomizations + 'static,
523{
524	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
525
526	// Use ethereum style for subscription ids
527	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
528
529	let telemetry = config
530		.telemetry_endpoints
531		.clone()
532		.filter(|x| !x.is_empty())
533		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
534			let worker = TelemetryWorker::new(16)?;
535			let telemetry = worker.handle().new_telemetry(endpoints);
536			Ok((worker, telemetry))
537		})
538		.transpose()?;
539
540	let heap_pages = config
541		.executor
542		.default_heap_pages
543		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
544			extra_pages: h as _,
545		});
546	let mut wasm_builder = WasmExecutor::builder()
547		.with_execution_method(config.executor.wasm_method)
548		.with_onchain_heap_alloc_strategy(heap_pages)
549		.with_offchain_heap_alloc_strategy(heap_pages)
550		.with_ignore_onchain_heap_pages(true)
551		.with_max_runtime_instances(config.executor.max_runtime_instances)
552		.with_runtime_cache_size(config.executor.runtime_cache_size);
553
554	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
555		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
556	}
557
558	let executor = wasm_builder.build();
559
560	let (client, backend, keystore_container, task_manager) =
561		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
562			config,
563			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
564			executor,
565			true,
566		)?;
567
568	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
569		client
570			.execution_extensions()
571			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
572			Block,
573			sp_io::UseDalekExt,
574		>::new(block_number));
575	}
576
577	let client = Arc::new(client);
578
579	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
580
581	let telemetry = telemetry.map(|(worker, telemetry)| {
582		task_manager
583			.spawn_handle()
584			.spawn("telemetry", None, worker.run());
585		telemetry
586	});
587
588	let maybe_select_chain = if config.chain_spec.is_dev() {
589		Some(sc_consensus::LongestChain::new(backend.clone()))
590	} else {
591		None
592	};
593
594	let transaction_pool = sc_transaction_pool::Builder::new(
595		task_manager.spawn_essential_handle(),
596		client.clone(),
597		config.role.is_authority().into(),
598	)
599	.with_options(config.transaction_pool.clone())
600	.with_prometheus(config.prometheus_registry())
601	.build();
602
603	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
604	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
605
606	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
607	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
608	let block_import = MoonbeamBlockImport::new(
609		frontier_block_import.clone(),
610		client.clone(),
611		node_extra_args.authoring_policy,
612		config.chain_spec.is_dev(),
613	);
614
615	let create_inherent_data_providers = move |_, _| async move {
616		let time = sp_timestamp::InherentDataProvider::from_system_time();
617		Ok((time,))
618	};
619
620	let import_queue = {
621		let block_import_for_queue: Box<
622			dyn sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync,
623		> = match &block_import {
624			MoonbeamBlockImport::Dev(bi) => Box::new(bi.clone()),
625			MoonbeamBlockImport::ParachainLookahead(bi) => {
626				if node_extra_args.legacy_block_import_strategy {
627					Box::new(TParachainBlockImport::new_with_delayed_best_block(
628						bi.clone(),
629						backend.clone(),
630					))
631				} else {
632					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
633				}
634			}
635			MoonbeamBlockImport::ParachainSlotBased(bi, _handle) => {
636				if node_extra_args.legacy_block_import_strategy {
637					Box::new(TParachainBlockImport::new_with_delayed_best_block(
638						bi.clone(),
639						backend.clone(),
640					))
641				} else {
642					Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
643				}
644			}
645		};
646
647		nimbus_consensus::import_queue(
648			client.clone(),
649			block_import_for_queue,
650			create_inherent_data_providers,
651			&task_manager.spawn_essential_handle(),
652			config.prometheus_registry(),
653			node_extra_args.legacy_block_import_strategy,
654			false,
655		)?
656	};
657
658	Ok(PartialComponents {
659		backend,
660		client,
661		import_queue,
662		keystore_container,
663		task_manager,
664		transaction_pool: transaction_pool.into(),
665		select_chain: maybe_select_chain,
666		other: (
667			block_import,
668			filter_pool,
669			telemetry,
670			telemetry_worker_handle,
671			frontier_backend,
672			fee_history_cache,
673		),
674	})
675}
676
677async fn build_relay_chain_interface(
678	polkadot_config: Configuration,
679	parachain_config: &Configuration,
680	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
681	task_manager: &mut TaskManager,
682	collator_options: CollatorOptions,
683	hwbench: Option<sc_sysinfo::HwBench>,
684) -> RelayChainResult<(
685	Arc<(dyn RelayChainInterface + 'static)>,
686	Option<CollatorPair>,
687)> {
688	let result = if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
689		collator_options.relay_chain_mode
690	{
691		build_minimal_relay_chain_node_with_rpc(
692			polkadot_config,
693			parachain_config.prometheus_registry(),
694			task_manager,
695			rpc_target_urls,
696		)
697		.await
698	} else {
699		build_inprocess_relay_chain(
700			polkadot_config,
701			parachain_config,
702			telemetry_worker_handle,
703			task_manager,
704			hwbench,
705		)
706	};
707
708	// Extract only the first two elements from the 4-tuple
709	result
710		.map(|(relay_chain_interface, collator_pair, _, _)| (relay_chain_interface, collator_pair))
711}
712
713/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
714///
715/// This is the actual implementation that is abstract over the executor and the runtime api.
716#[sc_tracing::logging::prefix_logs_with("🌗")]
717async fn start_node_impl<RuntimeApi, Customizations, Net>(
718	parachain_config: Configuration,
719	polkadot_config: Configuration,
720	collator_options: CollatorOptions,
721	para_id: ParaId,
722	rpc_config: RpcConfig,
723	block_authoring_duration: Duration,
724	hwbench: Option<sc_sysinfo::HwBench>,
725	node_extra_args: NodeExtraArgs,
726) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
727where
728	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
729	RuntimeApi::RuntimeApi: RuntimeApiCollection
730		+ cumulus_primitives_core::GetCoreSelectorApi<Block>
731		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
732	Customizations: ClientCustomizations + 'static,
733	Net: NetworkBackend<Block, Hash>,
734{
735	let mut parachain_config = prepare_node_config(parachain_config);
736
737	let params = new_partial::<RuntimeApi, Customizations>(
738		&mut parachain_config,
739		&rpc_config,
740		node_extra_args.clone(),
741	)?;
742	let (
743		block_import,
744		filter_pool,
745		mut telemetry,
746		telemetry_worker_handle,
747		frontier_backend,
748		fee_history_cache,
749	) = params.other;
750
751	let client = params.client.clone();
752	let backend = params.backend.clone();
753	let mut task_manager = params.task_manager;
754
755	let (relay_chain_interface, collator_key) = build_relay_chain_interface(
756		polkadot_config,
757		&parachain_config,
758		telemetry_worker_handle,
759		&mut task_manager,
760		collator_options.clone(),
761		hwbench.clone(),
762	)
763	.await
764	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
765
766	let force_authoring = parachain_config.force_authoring;
767	let collator = parachain_config.role.is_authority();
768	let prometheus_registry = parachain_config.prometheus_registry().cloned();
769	let transaction_pool = params.transaction_pool.clone();
770	let import_queue_service = params.import_queue.service();
771	let net_config = FullNetworkConfiguration::<_, _, Net>::new(
772		&parachain_config.network,
773		prometheus_registry.clone(),
774	);
775
776	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
777		cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
778			parachain_config: &parachain_config,
779			client: client.clone(),
780			transaction_pool: transaction_pool.clone(),
781			spawn_handle: task_manager.spawn_handle(),
782			import_queue: params.import_queue,
783			para_id: para_id.clone(),
784			relay_chain_interface: relay_chain_interface.clone(),
785			net_config,
786			sybil_resistance_level: CollatorSybilResistance::Resistant,
787			metrics: Net::register_notification_metrics(
788				parachain_config
789					.prometheus_config
790					.as_ref()
791					.map(|config| &config.registry),
792			),
793		})
794		.await?;
795
796	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
797	let fee_history_limit = rpc_config.fee_history_limit;
798
799	// Sinks for pubsub notifications.
800	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
801	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
802	// notification to the subscriber on receiving a message through this channel.
803	// This way we avoid race conditions when using native substrate block import notification
804	// stream.
805	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
806		fc_mapping_sync::EthereumBlockNotification<Block>,
807	> = Default::default();
808	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
809
810	rpc::spawn_essential_tasks(
811		rpc::SpawnTasksParams {
812			task_manager: &task_manager,
813			client: client.clone(),
814			substrate_backend: backend.clone(),
815			frontier_backend: frontier_backend.clone(),
816			filter_pool: filter_pool.clone(),
817			overrides: overrides.clone(),
818			fee_history_limit,
819			fee_history_cache: fee_history_cache.clone(),
820		},
821		sync_service.clone(),
822		pubsub_notification_sinks.clone(),
823	);
824
825	let ethapi_cmd = rpc_config.ethapi.clone();
826	let tracing_requesters =
827		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
828			rpc::tracing::spawn_tracing_tasks(
829				&rpc_config,
830				prometheus_registry.clone(),
831				rpc::SpawnTasksParams {
832					task_manager: &task_manager,
833					client: client.clone(),
834					substrate_backend: backend.clone(),
835					frontier_backend: frontier_backend.clone(),
836					filter_pool: filter_pool.clone(),
837					overrides: overrides.clone(),
838					fee_history_limit,
839					fee_history_cache: fee_history_cache.clone(),
840				},
841			)
842		} else {
843			rpc::tracing::RpcRequesters {
844				debug: None,
845				trace: None,
846			}
847		};
848
849	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
850		task_manager.spawn_handle(),
851		overrides.clone(),
852		rpc_config.eth_log_block_cache,
853		rpc_config.eth_statuses_cache,
854		prometheus_registry.clone(),
855	));
856
857	let rpc_builder = {
858		let client = client.clone();
859		let pool = transaction_pool.clone();
860		let network = network.clone();
861		let sync = sync_service.clone();
862		let filter_pool = filter_pool.clone();
863		let frontier_backend = frontier_backend.clone();
864		let backend = backend.clone();
865		let ethapi_cmd = ethapi_cmd.clone();
866		let max_past_logs = rpc_config.max_past_logs;
867		let max_block_range = rpc_config.max_block_range;
868		let overrides = overrides.clone();
869		let fee_history_cache = fee_history_cache.clone();
870		let block_data_cache = block_data_cache.clone();
871		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
872
873		let keystore = params.keystore_container.keystore();
874		move |subscription_task_executor| {
875			#[cfg(feature = "moonbase-native")]
876			let forced_parent_hashes = {
877				let mut forced_parent_hashes = BTreeMap::new();
878				// Fixes for https://github.com/paritytech/frontier/pull/570
879				// #1648995
880				forced_parent_hashes.insert(
881					H256::from_str(
882						"0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
883					)
884					.expect("must be valid hash"),
885					H256::from_str(
886						"0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
887					)
888					.expect("must be valid hash"),
889				);
890				Some(forced_parent_hashes)
891			};
892			#[cfg(not(feature = "moonbase-native"))]
893			let forced_parent_hashes = None;
894
895			let deps = rpc::FullDeps {
896				backend: backend.clone(),
897				client: client.clone(),
898				command_sink: None,
899				ethapi_cmd: ethapi_cmd.clone(),
900				filter_pool: filter_pool.clone(),
901				frontier_backend: match &*frontier_backend {
902					fc_db::Backend::KeyValue(b) => b.clone(),
903					fc_db::Backend::Sql(b) => b.clone(),
904				},
905				graph: pool.clone(),
906				pool: pool.clone(),
907				is_authority: collator,
908				max_past_logs,
909				max_block_range,
910				fee_history_limit,
911				fee_history_cache: fee_history_cache.clone(),
912				network: network.clone(),
913				sync: sync.clone(),
914				dev_rpc_data: None,
915				block_data_cache: block_data_cache.clone(),
916				overrides: overrides.clone(),
917				forced_parent_hashes,
918			};
919			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
920				client.clone(),
921				keystore.clone(),
922			));
923			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
924				rpc::create_full(
925					deps,
926					subscription_task_executor,
927					Some(crate::rpc::TracingConfig {
928						tracing_requesters: tracing_requesters.clone(),
929						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
930					}),
931					pubsub_notification_sinks.clone(),
932					pending_consensus_data_provider,
933					para_id,
934				)
935				.map_err(Into::into)
936			} else {
937				rpc::create_full(
938					deps,
939					subscription_task_executor,
940					None,
941					pubsub_notification_sinks.clone(),
942					pending_consensus_data_provider,
943					para_id,
944				)
945				.map_err(Into::into)
946			}
947		}
948	};
949
950	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
951		rpc_builder: Box::new(rpc_builder),
952		client: client.clone(),
953		transaction_pool: transaction_pool.clone(),
954		task_manager: &mut task_manager,
955		config: parachain_config,
956		keystore: params.keystore_container.keystore(),
957		backend: backend.clone(),
958		network: network.clone(),
959		sync_service: sync_service.clone(),
960		system_rpc_tx,
961		tx_handler_controller,
962		telemetry: telemetry.as_mut(),
963	})?;
964
965	if let Some(hwbench) = hwbench {
966		sc_sysinfo::print_hwbench(&hwbench);
967
968		if let Some(ref mut telemetry) = telemetry {
969			let telemetry_handle = telemetry.handle();
970			task_manager.spawn_handle().spawn(
971				"telemetry_hwbench",
972				None,
973				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
974			);
975		}
976	}
977
978	let announce_block = {
979		let sync_service = sync_service.clone();
980		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
981	};
982
983	let relay_chain_slot_duration = Duration::from_secs(6);
984	let overseer_handle = relay_chain_interface
985		.overseer_handle()
986		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
987
988	start_relay_chain_tasks(StartRelayChainTasksParams {
989		client: client.clone(),
990		announce_block: announce_block.clone(),
991		para_id,
992		relay_chain_interface: relay_chain_interface.clone(),
993		task_manager: &mut task_manager,
994		da_recovery_profile: if collator {
995			DARecoveryProfile::Collator
996		} else {
997			DARecoveryProfile::FullNode
998		},
999		import_queue: import_queue_service,
1000		relay_chain_slot_duration,
1001		recovery_handle: Box::new(overseer_handle.clone()),
1002		sync_service: sync_service.clone(),
1003		prometheus_registry: prometheus_registry.as_ref(),
1004	})?;
1005
1006	if matches!(block_import, MoonbeamBlockImport::Dev(_)) {
1007		return Err(sc_service::Error::Other(
1008			"Block import pipeline is not for parachain".into(),
1009		));
1010	}
1011
1012	if collator {
1013		start_consensus::<RuntimeApi, _>(
1014			backend.clone(),
1015			client.clone(),
1016			block_import,
1017			prometheus_registry.as_ref(),
1018			telemetry.as_ref().map(|t| t.handle()),
1019			&task_manager,
1020			relay_chain_interface.clone(),
1021			transaction_pool,
1022			params.keystore_container.keystore(),
1023			para_id,
1024			collator_key.expect("Command line arguments do not allow this. qed"),
1025			overseer_handle,
1026			announce_block,
1027			force_authoring,
1028			relay_chain_slot_duration,
1029			block_authoring_duration,
1030			sync_service.clone(),
1031			node_extra_args,
1032		)?;
1033	}
1034
1035	Ok((task_manager, client))
1036}
1037
1038fn start_consensus<RuntimeApi, SO>(
1039	backend: Arc<FullBackend>,
1040	client: Arc<FullClient<RuntimeApi>>,
1041	block_import: MoonbeamBlockImport<FullClient<RuntimeApi>>,
1042	prometheus_registry: Option<&Registry>,
1043	telemetry: Option<TelemetryHandle>,
1044	task_manager: &TaskManager,
1045	relay_chain_interface: Arc<dyn RelayChainInterface>,
1046	transaction_pool: Arc<
1047		sc_transaction_pool::TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
1048	>,
1049	keystore: KeystorePtr,
1050	para_id: ParaId,
1051	collator_key: CollatorPair,
1052	overseer_handle: OverseerHandle,
1053	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
1054	force_authoring: bool,
1055	relay_chain_slot_duration: Duration,
1056	block_authoring_duration: Duration,
1057	sync_oracle: SO,
1058	node_extra_args: NodeExtraArgs,
1059) -> Result<(), sc_service::Error>
1060where
1061	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1062	RuntimeApi::RuntimeApi: RuntimeApiCollection
1063		+ cumulus_primitives_core::GetCoreSelectorApi<Block>
1064		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
1065	sc_client_api::StateBackendFor<FullBackend, Block>: sc_client_api::StateBackend<BlakeTwo256>,
1066	SO: SyncOracle + Send + Sync + Clone + 'static,
1067{
1068	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
1069		task_manager.spawn_handle(),
1070		client.clone(),
1071		transaction_pool,
1072		prometheus_registry,
1073		telemetry.clone(),
1074	);
1075
1076	let proposer = Proposer::new(proposer_factory);
1077	let collator_service = CollatorService::new(
1078		client.clone(),
1079		Arc::new(task_manager.spawn_handle()),
1080		announce_block,
1081		client.clone(),
1082	);
1083
1084	fn create_inherent_data_providers<A, B>(
1085		_: A,
1086		_: B,
1087	) -> impl futures::Future<
1088		Output = Result<
1089			(
1090				nimbus_primitives::InherentDataProvider,
1091				session_keys_primitives::InherentDataProvider,
1092			),
1093			Box<dyn std::error::Error + Send + Sync>,
1094		>,
1095	> {
1096		async move {
1097			let author = nimbus_primitives::InherentDataProvider;
1098			let randomness = session_keys_primitives::InherentDataProvider;
1099			Ok((author, randomness))
1100		}
1101	}
1102
1103	let client_clone = client.clone();
1104	let keystore_clone = keystore.clone();
1105	let maybe_provide_vrf_digest =
1106		move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
1107			moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
1108				&client_clone,
1109				&keystore_clone,
1110				nimbus_id,
1111				parent,
1112			)
1113		};
1114
1115	log::info!("Collator started with asynchronous backing.");
1116	let client_clone = client.clone();
1117	let code_hash_provider = move |block_hash| {
1118		client_clone
1119			.code_at(block_hash)
1120			.ok()
1121			.map(polkadot_primitives::ValidationCode)
1122			.map(|c| c.hash())
1123	};
1124
1125	match block_import {
1126		MoonbeamBlockImport::ParachainLookahead(bi) => {
1127			let block_import = if node_extra_args.legacy_block_import_strategy {
1128				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
1129			} else {
1130				TParachainBlockImport::new(bi, backend.clone())
1131			};
1132
1133			let params = nimbus_consensus::collators::lookahead::Params {
1134				additional_digests_provider: maybe_provide_vrf_digest,
1135				additional_relay_keys: vec![relay_chain::well_known_keys::EPOCH_INDEX.to_vec()],
1136				authoring_duration: block_authoring_duration,
1137				block_import,
1138				code_hash_provider,
1139				collator_key,
1140				collator_service,
1141				create_inherent_data_providers,
1142				force_authoring,
1143				keystore,
1144				overseer_handle,
1145				para_backend: backend,
1146				para_client: client,
1147				para_id,
1148				proposer,
1149				relay_chain_slot_duration,
1150				relay_client: relay_chain_interface,
1151				slot_duration: None,
1152				sync_oracle,
1153				reinitialize: false,
1154				max_pov_percentage: node_extra_args.max_pov_percentage,
1155			};
1156
1157			task_manager.spawn_essential_handle().spawn(
1158				"nimbus",
1159				None,
1160				nimbus_consensus::collators::lookahead::run::<
1161					Block,
1162					_,
1163					_,
1164					_,
1165					FullBackend,
1166					_,
1167					_,
1168					_,
1169					_,
1170					_,
1171					_,
1172				>(params),
1173			);
1174		}
1175		MoonbeamBlockImport::ParachainSlotBased(bi, handle) => {
1176			let block_import = if node_extra_args.legacy_block_import_strategy {
1177				TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
1178			} else {
1179				TParachainBlockImport::new(bi, backend.clone())
1180			};
1181
1182			nimbus_consensus::collators::slot_based::run::<
1183				Block,
1184				nimbus_primitives::NimbusPair,
1185				_,
1186				_,
1187				_,
1188				FullBackend,
1189				_,
1190				_,
1191				_,
1192				_,
1193				_,
1194				_,
1195			>(nimbus_consensus::collators::slot_based::Params {
1196				additional_digests_provider: maybe_provide_vrf_digest,
1197				additional_relay_state_keys: vec![
1198					relay_chain::well_known_keys::EPOCH_INDEX.to_vec()
1199				],
1200				authoring_duration: block_authoring_duration,
1201				block_import,
1202				code_hash_provider,
1203				collator_key,
1204				collator_service,
1205				create_inherent_data_providers: move |b, a| async move {
1206					create_inherent_data_providers(b, a).await
1207				},
1208				force_authoring,
1209				keystore,
1210				para_backend: backend,
1211				para_client: client,
1212				para_id,
1213				proposer,
1214				relay_chain_slot_duration,
1215				relay_client: relay_chain_interface,
1216				para_slot_duration: None,
1217				reinitialize: false,
1218				max_pov_percentage: node_extra_args.max_pov_percentage.map(|p| p as u32),
1219				export_pov: node_extra_args.export_pov,
1220				slot_offset: Duration::from_secs(1),
1221				spawner: task_manager.spawn_essential_handle(),
1222				block_import_handle: handle,
1223			});
1224		}
1225		MoonbeamBlockImport::Dev(_) => {
1226			return Err(sc_service::Error::Other(
1227				"Dev block import should not be used in parachain consensus".into(),
1228			))
1229		}
1230	}
1231
1232	Ok(())
1233}
1234
1235/// Start a normal parachain node.
1236// Rustfmt wants to format the closure with space indentation.
1237#[rustfmt::skip]
1238pub async fn start_node<RuntimeApi, Customizations>(
1239	parachain_config: Configuration,
1240	polkadot_config: Configuration,
1241	collator_options: CollatorOptions,
1242	para_id: ParaId,
1243	rpc_config: RpcConfig,
1244	block_authoring_duration: Duration,
1245	hwbench: Option<sc_sysinfo::HwBench>,
1246	node_extra_args: NodeExtraArgs,
1247) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
1248where
1249	RuntimeApi:
1250		ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1251	RuntimeApi::RuntimeApi:
1252		RuntimeApiCollection + cumulus_primitives_core::GetCoreSelectorApi<Block>
1253		+ cumulus_primitives_core::RelayParentOffsetApi<Block>,
1254	Customizations: ClientCustomizations + 'static,
1255{
1256	start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
1257		parachain_config,
1258		polkadot_config,
1259		collator_options,
1260		para_id,
1261		rpc_config,
1262		block_authoring_duration,
1263		hwbench,
1264		node_extra_args
1265	)
1266	.await
1267}
1268
1269/// Builds a new development service. This service uses manual seal, and mocks
1270/// the parachain inherent.
1271pub async fn new_dev<RuntimeApi, Customizations, Net>(
1272	mut config: Configuration,
1273	para_id: Option<u32>,
1274	_author_id: Option<NimbusId>,
1275	sealing: moonbeam_cli_opt::Sealing,
1276	rpc_config: RpcConfig,
1277	hwbench: Option<sc_sysinfo::HwBench>,
1278	node_extra_args: NodeExtraArgs,
1279) -> Result<TaskManager, ServiceError>
1280where
1281	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1282	RuntimeApi::RuntimeApi: RuntimeApiCollection,
1283	Customizations: ClientCustomizations + 'static,
1284	Net: NetworkBackend<Block, Hash>,
1285{
1286	use async_io::Timer;
1287	use futures::Stream;
1288	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
1289
1290	let sc_service::PartialComponents {
1291		client,
1292		backend,
1293		mut task_manager,
1294		import_queue,
1295		keystore_container,
1296		select_chain: maybe_select_chain,
1297		transaction_pool,
1298		other:
1299			(
1300				block_import_pipeline,
1301				filter_pool,
1302				mut telemetry,
1303				_telemetry_worker_handle,
1304				frontier_backend,
1305				fee_history_cache,
1306			),
1307	} = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, node_extra_args)?;
1308
1309	let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
1310		block_import
1311	} else {
1312		return Err(ServiceError::Other(
1313			"Block import pipeline is not dev".to_string(),
1314		));
1315	};
1316
1317	let prometheus_registry = config.prometheus_registry().cloned();
1318	let net_config =
1319		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
1320
1321	let metrics = Net::register_notification_metrics(
1322		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
1323	);
1324
1325	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
1326		sc_service::build_network(sc_service::BuildNetworkParams {
1327			config: &config,
1328			client: client.clone(),
1329			transaction_pool: transaction_pool.clone(),
1330			spawn_handle: task_manager.spawn_handle(),
1331			import_queue,
1332			block_announce_validator_builder: None,
1333			warp_sync_config: None,
1334			net_config,
1335			block_relay: None,
1336			metrics,
1337		})?;
1338
1339	if config.offchain_worker.enabled {
1340		task_manager.spawn_handle().spawn(
1341			"offchain-workers-runner",
1342			"offchain-work",
1343			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1344				runtime_api_provider: client.clone(),
1345				keystore: Some(keystore_container.keystore()),
1346				offchain_db: backend.offchain_storage(),
1347				transaction_pool: Some(OffchainTransactionPoolFactory::new(
1348					transaction_pool.clone(),
1349				)),
1350				network_provider: Arc::new(network.clone()),
1351				is_validator: config.role.is_authority(),
1352				enable_http_requests: true,
1353				custom_extensions: move |_| vec![],
1354			})?
1355			.run(client.clone(), task_manager.spawn_handle())
1356			.boxed(),
1357		);
1358	}
1359
1360	let prometheus_registry = config.prometheus_registry().cloned();
1361	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
1362	let fee_history_limit = rpc_config.fee_history_limit;
1363	let mut command_sink = None;
1364	let mut dev_rpc_data = None;
1365	let collator = config.role.is_authority();
1366
1367	let parachain_id: ParaId = para_id
1368		.expect("para ID should be specified for dev service")
1369		.into();
1370
1371	if collator {
1372		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
1373			task_manager.spawn_handle(),
1374			client.clone(),
1375			transaction_pool.clone(),
1376			prometheus_registry.as_ref(),
1377			telemetry.as_ref().map(|x| x.handle()),
1378		);
1379		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
1380
1381		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
1382			match sealing {
1383				moonbeam_cli_opt::Sealing::Instant => {
1384					Box::new(
1385						// This bit cribbed from the implementation of instant seal.
1386						transaction_pool.import_notification_stream().map(|_| {
1387							EngineCommand::SealNewBlock {
1388								create_empty: false,
1389								finalize: false,
1390								parent_hash: None,
1391								sender: None,
1392							}
1393						}),
1394					)
1395				}
1396				moonbeam_cli_opt::Sealing::Manual => {
1397					let (sink, stream) = futures::channel::mpsc::channel(1000);
1398					// Keep a reference to the other end of the channel. It goes to the RPC.
1399					command_sink = Some(sink);
1400					Box::new(stream)
1401				}
1402				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
1403					Timer::interval(Duration::from_millis(millis)),
1404					|_| EngineCommand::SealNewBlock {
1405						create_empty: true,
1406						finalize: false,
1407						parent_hash: None,
1408						sender: None,
1409					},
1410				)),
1411			};
1412
1413		let select_chain = maybe_select_chain.expect(
1414			"`new_partial` builds a `LongestChainRule` when building dev service.\
1415				We specified the dev service when calling `new_partial`.\
1416				Therefore, a `LongestChainRule` is present. qed.",
1417		);
1418
1419		// Create channels for mocked XCM messages.
1420		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
1421		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
1422		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
1423		dev_rpc_data = Some((
1424			downward_xcm_sender,
1425			hrmp_xcm_sender,
1426			additional_relay_offset.clone(),
1427		));
1428
1429		// Need to clone it and store here to avoid moving of `client`
1430		// variable in closure below.
1431		let client_vrf = client.clone();
1432
1433		let keystore_clone = keystore_container.keystore().clone();
1434		let maybe_provide_vrf_digest =
1435			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
1436				moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
1437					&client_vrf,
1438					&keystore_clone,
1439					nimbus_id,
1440					parent,
1441				)
1442			};
1443
1444		// Need to clone it and store here to avoid moving of `client`
1445		// variable in closure below.
1446		let client_for_cidp = client.clone();
1447
1448		task_manager.spawn_essential_handle().spawn_blocking(
1449			"authorship_task",
1450			Some("block-authoring"),
1451			run_manual_seal(ManualSealParams {
1452				block_import,
1453				env,
1454				client: client.clone(),
1455				pool: transaction_pool.clone(),
1456				commands_stream,
1457				select_chain,
1458				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
1459					keystore: keystore_container.keystore(),
1460					client: client.clone(),
1461					additional_digests_provider: maybe_provide_vrf_digest,
1462					_phantom: Default::default(),
1463				})),
1464				create_inherent_data_providers: move |block: H256, ()| {
1465					let maybe_current_para_block = client_for_cidp.number(block);
1466					let maybe_current_para_head = client_for_cidp.expect_header(block);
1467					let downward_xcm_receiver = downward_xcm_receiver.clone();
1468					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
1469					let additional_relay_offset = additional_relay_offset.clone();
1470					let relay_slot_key = well_known_keys::CURRENT_SLOT.to_vec();
1471
1472					// Need to clone it and store here to avoid moving of `client`
1473					// variable in closure below.
1474					let client_for_xcm = client_for_cidp.clone();
1475
1476					async move {
1477						MockTimestampInherentDataProvider::advance_timestamp(
1478							RELAY_CHAIN_SLOT_DURATION_MILLIS,
1479						);
1480
1481						let current_para_block = maybe_current_para_block?
1482							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
1483
1484						let current_para_block_head = Some(polkadot_primitives::HeadData(
1485							maybe_current_para_head?.encode(),
1486						));
1487
1488						// Get the mocked timestamp
1489						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
1490						// Calculate mocked slot number
1491						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
1492
1493						let additional_key_values = vec![
1494							(relay_slot_key, Slot::from(slot).encode()),
1495							(
1496								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
1497								AbridgedHostConfiguration {
1498									max_code_size: 3_145_728,
1499									max_head_data_size: 20_480,
1500									max_upward_queue_count: 174_762,
1501									max_upward_queue_size: 1_048_576,
1502									max_upward_message_size: 65_531,
1503									max_upward_message_num_per_candidate: 16,
1504									hrmp_max_message_num_per_candidate: 10,
1505									validation_upgrade_cooldown: 6,
1506									validation_upgrade_delay: 6,
1507									async_backing_params: AsyncBackingParams {
1508										max_candidate_depth: 3,
1509										allowed_ancestry_len: 2,
1510									},
1511								}
1512								.encode(),
1513							),
1514						];
1515
1516						let current_para_head = client_for_xcm
1517							.header(block)
1518							.expect("Header lookup should succeed")
1519							.expect("Header passed in as parent should be present in backend.");
1520
1521						let should_send_go_ahead = match client_for_xcm
1522							.runtime_api()
1523							.collect_collation_info(block, &current_para_head)
1524						{
1525							Ok(info) => info.new_validation_code.is_some(),
1526							Err(e) => {
1527								log::error!("Failed to collect collation info: {:?}", e);
1528
1529								false
1530							}
1531						};
1532
1533						let mocked_parachain = MockValidationDataInherentDataProvider {
1534							current_para_block,
1535							para_id: parachain_id,
1536							upgrade_go_ahead: should_send_go_ahead.then(|| {
1537								log::info!(
1538									"Detected pending validation code, sending go-ahead signal."
1539								);
1540
1541								UpgradeGoAhead::GoAhead
1542							}),
1543							current_para_block_head,
1544							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
1545							relay_blocks_per_para_block: 1,
1546							para_blocks_per_relay_epoch: 10,
1547							relay_randomness_config: (),
1548							xcm_config: MockXcmConfig::new(
1549								&*client_for_xcm,
1550								block,
1551								Default::default(),
1552							),
1553							raw_downward_messages: downward_xcm_receiver.drain().collect(),
1554							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
1555							additional_key_values: Some(additional_key_values),
1556						};
1557
1558						let randomness = session_keys_primitives::InherentDataProvider;
1559
1560						Ok((
1561							MockTimestampInherentDataProvider,
1562							mocked_parachain,
1563							randomness,
1564						))
1565					}
1566				},
1567			}),
1568		);
1569	}
1570
1571	// Sinks for pubsub notifications.
1572	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
1573	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
1574	// notification to the subscriber on receiving a message through this channel.
1575	// This way we avoid race conditions when using native substrate block import notification
1576	// stream.
1577	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
1578		fc_mapping_sync::EthereumBlockNotification<Block>,
1579	> = Default::default();
1580	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1581
1582	rpc::spawn_essential_tasks(
1583		rpc::SpawnTasksParams {
1584			task_manager: &task_manager,
1585			client: client.clone(),
1586			substrate_backend: backend.clone(),
1587			frontier_backend: frontier_backend.clone(),
1588			filter_pool: filter_pool.clone(),
1589			overrides: overrides.clone(),
1590			fee_history_limit,
1591			fee_history_cache: fee_history_cache.clone(),
1592		},
1593		sync_service.clone(),
1594		pubsub_notification_sinks.clone(),
1595	);
1596	let ethapi_cmd = rpc_config.ethapi.clone();
1597	let tracing_requesters =
1598		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
1599			rpc::tracing::spawn_tracing_tasks(
1600				&rpc_config,
1601				prometheus_registry.clone(),
1602				rpc::SpawnTasksParams {
1603					task_manager: &task_manager,
1604					client: client.clone(),
1605					substrate_backend: backend.clone(),
1606					frontier_backend: frontier_backend.clone(),
1607					filter_pool: filter_pool.clone(),
1608					overrides: overrides.clone(),
1609					fee_history_limit,
1610					fee_history_cache: fee_history_cache.clone(),
1611				},
1612			)
1613		} else {
1614			rpc::tracing::RpcRequesters {
1615				debug: None,
1616				trace: None,
1617			}
1618		};
1619
1620	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
1621		task_manager.spawn_handle(),
1622		overrides.clone(),
1623		rpc_config.eth_log_block_cache,
1624		rpc_config.eth_statuses_cache,
1625		prometheus_registry,
1626	));
1627
1628	let rpc_builder = {
1629		let client = client.clone();
1630		let pool = transaction_pool.clone();
1631		let backend = backend.clone();
1632		let network = network.clone();
1633		let sync = sync_service.clone();
1634		let ethapi_cmd = ethapi_cmd.clone();
1635		let max_past_logs = rpc_config.max_past_logs;
1636		let max_block_range = rpc_config.max_block_range;
1637		let overrides = overrides.clone();
1638		let fee_history_cache = fee_history_cache.clone();
1639		let block_data_cache = block_data_cache.clone();
1640		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
1641
1642		let keystore = keystore_container.keystore();
1643		move |subscription_task_executor| {
1644			let deps = rpc::FullDeps {
1645				backend: backend.clone(),
1646				client: client.clone(),
1647				command_sink: command_sink.clone(),
1648				ethapi_cmd: ethapi_cmd.clone(),
1649				filter_pool: filter_pool.clone(),
1650				frontier_backend: match &*frontier_backend {
1651					fc_db::Backend::KeyValue(b) => b.clone(),
1652					fc_db::Backend::Sql(b) => b.clone(),
1653				},
1654				graph: pool.clone(),
1655				pool: pool.clone(),
1656				is_authority: collator,
1657				max_past_logs,
1658				max_block_range,
1659				fee_history_limit,
1660				fee_history_cache: fee_history_cache.clone(),
1661				network: network.clone(),
1662				sync: sync.clone(),
1663				dev_rpc_data: dev_rpc_data.clone(),
1664				overrides: overrides.clone(),
1665				block_data_cache: block_data_cache.clone(),
1666				forced_parent_hashes: None,
1667			};
1668
1669			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1670				client.clone(),
1671				keystore.clone(),
1672			));
1673			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
1674				rpc::create_full(
1675					deps,
1676					subscription_task_executor,
1677					Some(crate::rpc::TracingConfig {
1678						tracing_requesters: tracing_requesters.clone(),
1679						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
1680					}),
1681					pubsub_notification_sinks.clone(),
1682					pending_consensus_data_provider,
1683					parachain_id,
1684				)
1685				.map_err(Into::into)
1686			} else {
1687				rpc::create_full(
1688					deps,
1689					subscription_task_executor,
1690					None,
1691					pubsub_notification_sinks.clone(),
1692					pending_consensus_data_provider,
1693					parachain_id,
1694				)
1695				.map_err(Into::into)
1696			}
1697		}
1698	};
1699
1700	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
1701		network,
1702		client,
1703		keystore: keystore_container.keystore(),
1704		task_manager: &mut task_manager,
1705		transaction_pool,
1706		rpc_builder: Box::new(rpc_builder),
1707		backend,
1708		system_rpc_tx,
1709		sync_service: sync_service.clone(),
1710		config,
1711		tx_handler_controller,
1712		telemetry: None,
1713	})?;
1714
1715	if let Some(hwbench) = hwbench {
1716		sc_sysinfo::print_hwbench(&hwbench);
1717
1718		if let Some(ref mut telemetry) = telemetry {
1719			let telemetry_handle = telemetry.handle();
1720			task_manager.spawn_handle().spawn(
1721				"telemetry_hwbench",
1722				None,
1723				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
1724			);
1725		}
1726	}
1727
1728	log::info!("Development Service Ready");
1729
1730	Ok(task_manager)
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735	use crate::chain_spec::moonbase::ChainSpec;
1736	use crate::chain_spec::Extensions;
1737	use jsonrpsee::server::BatchRequestConfig;
1738	use moonbase_runtime::AccountId;
1739	use prometheus::{proto::LabelPair, Counter};
1740	use sc_network::config::NetworkConfiguration;
1741	use sc_service::config::RpcConfiguration;
1742	use sc_service::ChainType;
1743	use sc_service::{
1744		config::{BasePath, DatabaseSource, KeystoreConfig},
1745		Configuration, Role,
1746	};
1747	use std::path::Path;
1748	use std::str::FromStr;
1749
1750	use super::*;
1751
1752	#[test]
1753	fn test_set_prometheus_registry_uses_moonbeam_prefix() {
1754		let counter_name = "my_counter";
1755		let expected_metric_name = "moonbeam_my_counter";
1756		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1757		let mut config = Configuration {
1758			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1759				"0.0.0.0:8080".parse().unwrap(),
1760				"".into(),
1761			)),
1762			..test_config("test")
1763		};
1764
1765		set_prometheus_registry(&mut config, false).unwrap();
1766		// generate metric
1767		let reg = config.prometheus_registry().unwrap();
1768		reg.register(counter.clone()).unwrap();
1769		counter.inc();
1770
1771		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1772		assert_eq!(actual_metric_name.as_str(), expected_metric_name);
1773	}
1774
1775	#[test]
1776	fn test_set_prometheus_registry_skips_moonbeam_prefix() {
1777		let counter_name = "my_counter";
1778		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1779		let mut config = Configuration {
1780			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1781				"0.0.0.0:8080".parse().unwrap(),
1782				"".into(),
1783			)),
1784			..test_config("test")
1785		};
1786
1787		set_prometheus_registry(&mut config, true).unwrap();
1788		// generate metric
1789		let reg = config.prometheus_registry().unwrap();
1790		reg.register(counter.clone()).unwrap();
1791		counter.inc();
1792
1793		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1794		assert_eq!(actual_metric_name.as_str(), counter_name);
1795	}
1796
1797	#[test]
1798	fn test_set_prometheus_registry_adds_chain_id_as_label() {
1799		let input_chain_id = "moonriver";
1800
1801		let mut expected_label = LabelPair::default();
1802		expected_label.set_name("chain".to_owned());
1803		expected_label.set_value("moonriver".to_owned());
1804		let expected_chain_label = Some(expected_label);
1805
1806		let counter = Box::new(Counter::new("foo", "foobar").unwrap());
1807		let mut config = Configuration {
1808			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1809				"0.0.0.0:8080".parse().unwrap(),
1810				"".into(),
1811			)),
1812			..test_config(input_chain_id)
1813		};
1814
1815		set_prometheus_registry(&mut config, false).unwrap();
1816		// generate metric
1817		let reg = config.prometheus_registry().unwrap();
1818		reg.register(counter.clone()).unwrap();
1819		counter.inc();
1820
1821		let actual_chain_label = reg
1822			.gather()
1823			.first()
1824			.unwrap()
1825			.get_metric()
1826			.first()
1827			.unwrap()
1828			.get_label()
1829			.into_iter()
1830			.find(|x| x.get_name() == "chain")
1831			.cloned();
1832
1833		assert_eq!(actual_chain_label, expected_chain_label);
1834	}
1835
1836	#[test]
1837	fn dalek_does_not_panic() {
1838		use futures::executor::block_on;
1839		use sc_block_builder::BlockBuilderBuilder;
1840		use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
1841		use sp_api::ProvideRuntimeApi;
1842		use sp_consensus::BlockOrigin;
1843		use substrate_test_runtime::TestAPI;
1844		use substrate_test_runtime_client::runtime::Block;
1845		use substrate_test_runtime_client::{
1846			ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
1847		};
1848
1849		fn zero_ed_pub() -> sp_core::ed25519::Public {
1850			sp_core::ed25519::Public::default()
1851		}
1852
1853		// This is an invalid signature
1854		// this breaks after ed25519 1.3. It makes the signature panic at creation
1855		// This test ensures we should never panic
1856		fn invalid_sig() -> sp_core::ed25519::Signature {
1857			let signature = hex_literal::hex!(
1858				"a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
1859		7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
1860			);
1861			sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
1862		}
1863
1864		let tmp = tempfile::tempdir().unwrap();
1865		let backend = Arc::new(
1866			Backend::new(
1867				DatabaseSettings {
1868					trie_cache_maximum_size: Some(1 << 20),
1869					state_pruning: Some(PruningMode::ArchiveAll),
1870					blocks_pruning: BlocksPruning::KeepAll,
1871					source: DatabaseSource::RocksDb {
1872						path: tmp.path().into(),
1873						cache_size: 1024,
1874					},
1875					metrics_registry: None,
1876				},
1877				u64::MAX,
1878			)
1879			.unwrap(),
1880		);
1881		let client = TestClientBuilder::with_backend(backend).build();
1882
1883		client
1884			.execution_extensions()
1885			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
1886			Block,
1887			sp_io::UseDalekExt,
1888		>::new(1));
1889
1890		let a1 = BlockBuilderBuilder::new(&client)
1891			.on_parent_block(client.chain_info().genesis_hash)
1892			.with_parent_block_number(0)
1893			// Enable proof recording if required. This call is optional.
1894			.enable_proof_recording()
1895			.build()
1896			.unwrap()
1897			.build()
1898			.unwrap()
1899			.block;
1900
1901		block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
1902
1903		// On block zero it will use dalek
1904		// shouldnt panic on importing invalid sig
1905		assert!(!client
1906			.runtime_api()
1907			.verify_ed25519(
1908				client.chain_info().genesis_hash,
1909				invalid_sig(),
1910				zero_ed_pub(),
1911				vec![]
1912			)
1913			.unwrap());
1914	}
1915
1916	fn test_config(chain_id: &str) -> Configuration {
1917		let network_config = NetworkConfiguration::new("", "", Default::default(), None);
1918		let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
1919		let spec = ChainSpec::builder(&[0u8], Extensions::default())
1920			.with_name("test")
1921			.with_id(chain_id)
1922			.with_chain_type(ChainType::Local)
1923			.with_genesis_config(moonbase_runtime::genesis_config_preset::testnet_genesis(
1924				AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
1925				vec![],
1926				vec![],
1927				vec![],
1928				vec![],
1929				vec![],
1930				ParaId::new(0),
1931				0,
1932			))
1933			.build();
1934
1935		Configuration {
1936			impl_name: String::from("test-impl"),
1937			impl_version: String::from("0.1"),
1938			role: Role::Full,
1939			tokio_handle: runtime.handle().clone(),
1940			transaction_pool: Default::default(),
1941			network: network_config,
1942			keystore: KeystoreConfig::Path {
1943				path: "key".into(),
1944				password: None,
1945			},
1946			database: DatabaseSource::RocksDb {
1947				path: "db".into(),
1948				cache_size: 128,
1949			},
1950			trie_cache_maximum_size: Some(16777216),
1951			warm_up_trie_cache: None,
1952			state_pruning: Default::default(),
1953			blocks_pruning: sc_service::BlocksPruning::KeepAll,
1954			chain_spec: Box::new(spec),
1955			executor: Default::default(),
1956			wasm_runtime_overrides: Default::default(),
1957			rpc: RpcConfiguration {
1958				addr: None,
1959				max_connections: Default::default(),
1960				cors: None,
1961				methods: Default::default(),
1962				max_request_size: Default::default(),
1963				max_response_size: Default::default(),
1964				id_provider: None,
1965				max_subs_per_conn: Default::default(),
1966				port: Default::default(),
1967				message_buffer_capacity: Default::default(),
1968				batch_config: BatchRequestConfig::Unlimited,
1969				rate_limit: Default::default(),
1970				rate_limit_whitelisted_ips: vec![],
1971				rate_limit_trust_proxy_headers: false,
1972			},
1973			data_path: Default::default(),
1974			prometheus_config: None,
1975			telemetry_endpoints: None,
1976			offchain_worker: Default::default(),
1977			force_authoring: false,
1978			disable_grandpa: false,
1979			dev_key_seed: None,
1980			tracing_targets: None,
1981			tracing_receiver: Default::default(),
1982			announce_block: true,
1983			base_path: BasePath::new(Path::new("")),
1984		}
1985	}
1986}
1987
1988struct PendingConsensusDataProvider<Client>
1989where
1990	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
1991	Client::Api: VrfApi<Block>,
1992{
1993	client: Arc<Client>,
1994	keystore: Arc<dyn Keystore>,
1995}
1996
1997impl<Client> PendingConsensusDataProvider<Client>
1998where
1999	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
2000	Client::Api: VrfApi<Block>,
2001{
2002	pub fn new(client: Arc<Client>, keystore: Arc<dyn Keystore>) -> Self {
2003		Self { client, keystore }
2004	}
2005}
2006
2007impl<Client> fc_rpc::pending::ConsensusDataProvider<Block> for PendingConsensusDataProvider<Client>
2008where
2009	Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
2010	Client::Api: VrfApi<Block>,
2011{
2012	fn create_digest(
2013		&self,
2014		parent: &Header,
2015		_data: &sp_inherents::InherentData,
2016	) -> Result<sp_runtime::Digest, sp_inherents::Error> {
2017		let hash = parent.hash();
2018		// Get the digest from the best block header.
2019		let mut digest = self
2020			.client
2021			.header(hash)
2022			.map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
2023			.ok_or(sp_inherents::Error::Application(
2024				"Best block header should be present".into(),
2025			))?
2026			.digest;
2027		// Get the nimbus id from the digest.
2028		let nimbus_id = digest
2029			.logs
2030			.iter()
2031			.find_map(|x| {
2032				if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
2033					Some(NimbusId::from_slice(nimbus_id.as_slice()).map_err(|_| {
2034						sp_inherents::Error::Application(
2035							"Nimbus pre-runtime digest should be valid".into(),
2036						)
2037					}))
2038				} else {
2039					None
2040				}
2041			})
2042			.ok_or(sp_inherents::Error::Application(
2043				"Nimbus pre-runtime digest should be present".into(),
2044			))??;
2045		// Remove the old VRF digest.
2046		let pos = digest.logs.iter().position(|x| {
2047			matches!(
2048				x,
2049				DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
2050			)
2051		});
2052		if let Some(pos) = pos {
2053			digest.logs.remove(pos);
2054		}
2055		// Create the VRF digest.
2056		let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
2057			.provide_digests(nimbus_id, hash);
2058		// Append the VRF digest to the digest.
2059		digest.logs.extend(vrf_digest);
2060		Ok(digest)
2061	}
2062}