moonbeam_service/lazy_loading/
mod.rs

1// Copyright 2024 Moonbeam foundation
2// This file is part of Moonbeam.
3
4// Moonbeam is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Moonbeam is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::chain_spec::Extensions;
18use crate::{
19	lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, ClientCustomizations,
20	FrontierBlockImport, HostFunctions, MockTimestampInherentDataProvider, MoonbeamBlockImport,
21	PartialComponentsResult, PendingConsensusDataProvider, RuntimeApiCollection,
22	RELAY_CHAIN_SLOT_DURATION_MILLIS, SOFT_DEADLINE_PERCENT, TIMESTAMP,
23};
24use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
25use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
26use fc_rpc::StorageOverrideHandler;
27use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
28use frontier_backend::LazyLoadingFrontierBackend;
29use futures::{FutureExt, StreamExt};
30use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
31use moonbeam_core_primitives::{Block, Hash};
32use nimbus_consensus::NimbusManualSealConsensusDataProvider;
33use nimbus_primitives::NimbusId;
34use parity_scale_codec::Encode;
35use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
36use sc_chain_spec::{get_extension, BuildGenesisBlock, ChainType, GenesisBlockBuilder};
37use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
38use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
39use sc_network::config::FullNetworkConfiguration;
40use sc_network::NetworkBackend;
41use sc_network_common::sync::SyncMode;
42use sc_service::{
43	error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
44	LocalCallExecutor, PartialComponents, TaskManager,
45};
46use sc_telemetry::{TelemetryHandle, TelemetryWorker};
47use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
48use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
49use sp_blockchain::HeaderBackend;
50use sp_core::traits::CodeExecutor;
51use sp_core::H256;
52use sp_runtime::traits::NumberFor;
53use std::collections::BTreeMap;
54use std::str::FromStr;
55use std::sync::atomic::Ordering;
56use std::sync::{Arc, Mutex};
57use std::time::Duration;
58
59pub mod call_executor;
60mod client;
61pub mod frontier_backend;
62mod helpers;
63mod lock;
64mod manual_sealing;
65mod rpc_client;
66mod state_overrides;
67pub mod substrate_backend;
68
69pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
70
71/// Lazy loading client type.
72pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
73	TLazyLoadingBackend<TBl>,
74	TLazyLoadingCallExecutor<TBl, TExec>,
75	TBl,
76	TRtApi,
77>;
78
79/// Lazy loading client backend type.
80pub type TLazyLoadingBackend<TBl> = substrate_backend::Backend<TBl>;
81
82/// Lazy loading client call executor type.
83pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
84	TBl,
85	LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
86>;
87
88/// Lazy loading parts type.
89pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
90	TLazyLoadingClient<TBl, TRtApi, TExec>,
91	Arc<TLazyLoadingBackend<TBl>>,
92	KeystoreContainer,
93	TaskManager,
94);
95
96type LazyLoadingClient<RuntimeApi> =
97	TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98type LazyLoadingBackend = TLazyLoadingBackend<Block>;
99
100/// Create the initial parts of a lazy loading node.
101pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
102	config: &mut Configuration,
103	lazy_loading_config: &LazyLoadingConfig,
104	telemetry: Option<TelemetryHandle>,
105	executor: TExec,
106) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
107where
108	TBl: BlockT + sp_runtime::DeserializeOwned,
109	TBl::Hash: From<H256>,
110	TExec: CodeExecutor + RuntimeVersionOf + Clone,
111{
112	let backend = substrate_backend::new_backend(config, &lazy_loading_config)?;
113
114	let genesis_block_builder = GenesisBlockBuilder::new(
115		config.chain_spec.as_storage_builder(),
116		!config.no_genesis(),
117		backend.clone(),
118		executor.clone(),
119	)?;
120
121	new_lazy_loading_parts_with_genesis_builder(
122		config,
123		telemetry,
124		executor,
125		backend,
126		genesis_block_builder,
127	)
128}
129
130/// Create the initial parts of a lazy loading node.
131pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
132	config: &Configuration,
133	telemetry: Option<TelemetryHandle>,
134	executor: TExec,
135	backend: Arc<TLazyLoadingBackend<TBl>>,
136	genesis_block_builder: TBuildGenesisBlock,
137) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
138where
139	TBl: BlockT + sp_runtime::DeserializeOwned,
140	TBl::Hash: From<H256>,
141	TExec: CodeExecutor + RuntimeVersionOf + Clone,
142	TBuildGenesisBlock:
143		BuildGenesisBlock<
144			TBl,
145			BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
146				TBl,
147			>>::BlockImportOperation,
148		>,
149{
150	let keystore_container = KeystoreContainer::new(&config.keystore)?;
151
152	let task_manager = {
153		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
154		TaskManager::new(config.tokio_handle.clone(), registry)?
155	};
156
157	let chain_spec = &config.chain_spec;
158	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
159		.cloned()
160		.unwrap_or_default();
161
162	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
163		.cloned()
164		.unwrap_or_default();
165
166	let client = {
167		let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
168			None,
169			Arc::new(executor.clone()),
170		);
171
172		let wasm_runtime_substitutes = config
173			.chain_spec
174			.code_substitutes()
175			.into_iter()
176			.map(|(n, c)| {
177				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
178					Error::Application(Box::from(format!(
179						"Failed to parse `{}` as block number for code substitutes. \
180						 In an old version the key for code substitute was a block hash. \
181						 Please update the chain spec to a version that is compatible with your node.",
182						n
183					)))
184				})?;
185				Ok((number, c))
186			})
187			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
188
189		let client = client::new_client(
190			backend.clone(),
191			executor,
192			genesis_block_builder,
193			fork_blocks,
194			bad_blocks,
195			extensions,
196			Box::new(task_manager.spawn_handle()),
197			config
198				.prometheus_config
199				.as_ref()
200				.map(|config| config.registry.clone()),
201			telemetry,
202			ClientConfig {
203				offchain_worker_enabled: config.offchain_worker.enabled,
204				offchain_indexing_api: config.offchain_worker.indexing_enabled,
205				wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
206				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
207				no_genesis: matches!(
208					config.network.sync_mode,
209					SyncMode::LightState { .. } | SyncMode::Warp { .. }
210				),
211				wasm_runtime_substitutes,
212				enable_import_proof_recording: true,
213			},
214		)?;
215
216		client
217	};
218
219	Ok((client, backend, keystore_container, task_manager))
220}
221
222/// Builds the PartialComponents for a lazy loading node.
223#[allow(clippy::type_complexity)]
224pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
225	config: &mut Configuration,
226	rpc_config: &RpcConfig,
227	lazy_loading_config: &LazyLoadingConfig,
228) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
229where
230	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
231	RuntimeApi::RuntimeApi: RuntimeApiCollection,
232	Customizations: ClientCustomizations + 'static,
233{
234	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
235
236	// Use ethereum style for subscription ids
237	config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
238
239	let telemetry = config
240		.telemetry_endpoints
241		.clone()
242		.filter(|x| !x.is_empty())
243		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
244			let worker = TelemetryWorker::new(16)?;
245			let telemetry = worker.handle().new_telemetry(endpoints);
246			Ok((worker, telemetry))
247		})
248		.transpose()?;
249
250	let heap_pages = config
251		.executor
252		.default_heap_pages
253		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
254			extra_pages: h as _,
255		});
256	let mut wasm_builder = WasmExecutor::builder()
257		.with_execution_method(config.executor.wasm_method)
258		.with_onchain_heap_alloc_strategy(heap_pages)
259		.with_offchain_heap_alloc_strategy(heap_pages)
260		.with_ignore_onchain_heap_pages(true)
261		.with_max_runtime_instances(config.executor.max_runtime_instances)
262		.with_runtime_cache_size(config.executor.runtime_cache_size);
263
264	if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
265		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
266	}
267
268	let executor = wasm_builder.build();
269
270	let (client, backend, keystore_container, task_manager) =
271		new_lazy_loading_parts::<Block, RuntimeApi, _>(
272			config,
273			lazy_loading_config,
274			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
275			executor,
276		)?;
277
278	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
279		client
280			.execution_extensions()
281			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
282			Block,
283			sp_io::UseDalekExt,
284		>::new(block_number));
285	}
286
287	let client = Arc::new(client);
288
289	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
290
291	let telemetry = telemetry.map(|(worker, telemetry)| {
292		task_manager
293			.spawn_handle()
294			.spawn("telemetry", None, worker.run());
295		telemetry
296	});
297
298	let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
299
300	let transaction_pool = sc_transaction_pool::Builder::new(
301		task_manager.spawn_essential_handle(),
302		client.clone(),
303		config.role.is_authority().into(),
304	)
305	.with_options(config.transaction_pool.clone())
306	.with_prometheus(config.prometheus_registry())
307	.build();
308
309	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
310	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
311
312	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
313	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
314
315	let create_inherent_data_providers = move |_, _| async move { Ok(()) };
316
317	let import_queue = nimbus_consensus::import_queue(
318		client.clone(),
319		frontier_block_import.clone(),
320		create_inherent_data_providers,
321		&task_manager.spawn_essential_handle(),
322		config.prometheus_registry(),
323		false,
324		false,
325	)?;
326	let block_import = MoonbeamBlockImport::Dev(frontier_block_import);
327
328	Ok(PartialComponents {
329		backend,
330		client,
331		import_queue,
332		keystore_container,
333		task_manager,
334		transaction_pool: transaction_pool.into(),
335		select_chain: maybe_select_chain,
336		other: (
337			block_import,
338			filter_pool,
339			telemetry,
340			telemetry_worker_handle,
341			frontier_backend,
342			fee_history_cache,
343		),
344	})
345}
346
347/// Builds a new lazy loading service. This service uses manual seal, and mocks
348/// the parachain inherent.
349#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
350pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
351	mut config: Configuration,
352	_author_id: Option<NimbusId>,
353	sealing: moonbeam_cli_opt::Sealing,
354	rpc_config: RpcConfig,
355	lazy_loading_config: LazyLoadingConfig,
356	hwbench: Option<sc_sysinfo::HwBench>,
357) -> Result<TaskManager, ServiceError>
358where
359	RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
360	RuntimeApi::RuntimeApi: RuntimeApiCollection,
361	Customizations: ClientCustomizations + 'static,
362	Net: NetworkBackend<Block, Hash>,
363{
364	use async_io::Timer;
365	use futures::Stream;
366	use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
367
368	let sc_service::PartialComponents {
369		client,
370		backend,
371		mut task_manager,
372		import_queue,
373		keystore_container,
374		select_chain: maybe_select_chain,
375		transaction_pool,
376		other:
377			(
378				block_import_pipeline,
379				filter_pool,
380				mut telemetry,
381				_telemetry_worker_handle,
382				frontier_backend,
383				fee_history_cache,
384			),
385	} = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
386		&mut config,
387		&rpc_config,
388		&lazy_loading_config,
389	)?;
390
391	let start_delay = 10;
392	let lazy_loading_startup_disclaimer = format!(
393		r#"
394
395		You are now running the Moonbeam client in lazy loading mode, where data is retrieved
396		from a live RPC node on demand.
397
398		Using remote state from: {rpc}
399		Forking from block: {fork_block}
400
401		To ensure the client works properly, please note the following:
402
403		    1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
404		    requests, as this can prevent the lazy loading client from functioning correctly;
405
406		    2. *Be Patient*: As the client may take approximately 20 times longer than normal to
407		    retrieve and process the necessary data for the requested operation.
408
409
410		The service will start in {start_delay} seconds...
411
412		"#,
413		rpc = lazy_loading_config.state_rpc,
414		fork_block = backend.fork_checkpoint.number
415	);
416
417	log::warn!(
418		"{}",
419		ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
420	);
421	tokio::time::sleep(Duration::from_secs(start_delay)).await;
422
423	let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
424		block_import
425	} else {
426		return Err(ServiceError::Other(
427			"Block import pipeline is not dev".to_string(),
428		));
429	};
430
431	let prometheus_registry = config.prometheus_registry().cloned();
432	let net_config =
433		FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
434
435	let metrics = Net::register_notification_metrics(
436		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
437	);
438
439	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
440		sc_service::build_network(sc_service::BuildNetworkParams {
441			config: &config,
442			client: client.clone(),
443			transaction_pool: transaction_pool.clone(),
444			spawn_handle: task_manager.spawn_handle(),
445			import_queue,
446			block_announce_validator_builder: None,
447			warp_sync_config: None,
448			net_config,
449			block_relay: None,
450			metrics,
451		})?;
452
453	if config.offchain_worker.enabled {
454		task_manager.spawn_handle().spawn(
455			"offchain-workers-runner",
456			"offchain-work",
457			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
458				runtime_api_provider: client.clone(),
459				keystore: Some(keystore_container.keystore()),
460				offchain_db: backend.offchain_storage(),
461				transaction_pool: Some(OffchainTransactionPoolFactory::new(
462					transaction_pool.clone(),
463				)),
464				network_provider: Arc::new(network.clone()),
465				is_validator: config.role.is_authority(),
466				enable_http_requests: true,
467				custom_extensions: move |_| vec![],
468			})?
469			.run(client.clone(), task_manager.spawn_handle())
470			.boxed(),
471		);
472	}
473
474	let prometheus_registry = config.prometheus_registry().cloned();
475	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
476	let fee_history_limit = rpc_config.fee_history_limit;
477	let mut command_sink = None;
478	let mut dev_rpc_data = None;
479	let collator = config.role.is_authority();
480
481	let parachain_id: ParaId = helpers::get_parachain_id(backend.rpc_client.clone())
482		.unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."))
483		.into();
484
485	if collator {
486		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
487			task_manager.spawn_handle(),
488			client.clone(),
489			transaction_pool.clone(),
490			prometheus_registry.as_ref(),
491			telemetry.as_ref().map(|x| x.handle()),
492		);
493		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
494
495		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
496			match sealing {
497				moonbeam_cli_opt::Sealing::Instant => {
498					Box::new(
499						// This bit cribbed from the implementation of instant seal.
500						transaction_pool.import_notification_stream().map(|_| {
501							EngineCommand::SealNewBlock {
502								create_empty: false,
503								finalize: false,
504								parent_hash: None,
505								sender: None,
506							}
507						}),
508					)
509				}
510				moonbeam_cli_opt::Sealing::Manual => {
511					let (sink, stream) = futures::channel::mpsc::channel(1000);
512					// Keep a reference to the other end of the channel. It goes to the RPC.
513					command_sink = Some(sink);
514					Box::new(stream)
515				}
516				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
517					Timer::interval(Duration::from_millis(millis)),
518					|_| EngineCommand::SealNewBlock {
519						create_empty: true,
520						finalize: false,
521						parent_hash: None,
522						sender: None,
523					},
524				)),
525			};
526
527		let select_chain = maybe_select_chain.expect(
528			"`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
529				We specified the dev service when calling `new_partial`.\
530				Therefore, a `LongestChainRule` is present. qed.",
531		);
532
533		// Create channels for mocked XCM messages.
534		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
535		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
536		let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
537		dev_rpc_data = Some((
538			downward_xcm_sender,
539			hrmp_xcm_sender,
540			additional_relay_offset.clone(),
541		));
542
543		// Need to clone it and store here to avoid moving of `client`
544		// variable in closure below.
545		let client_vrf = client.clone();
546
547		let keystore_clone = keystore_container.keystore().clone();
548		let maybe_provide_vrf_digest =
549			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
550				moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
551					&client_vrf,
552					&keystore_clone,
553					nimbus_id,
554					parent,
555				)
556			};
557
558		// Need to clone it and store here to avoid moving of `client`
559		// variable in closure below.
560		let client_for_cidp = client.clone();
561
562		task_manager.spawn_essential_handle().spawn_blocking(
563			"authorship_task",
564			Some("block-authoring"),
565			manual_sealing::run_manual_seal(ManualSealParams {
566				block_import,
567				env,
568				client: client.clone(),
569				pool: transaction_pool.clone(),
570				commands_stream,
571				select_chain,
572				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
573					keystore: keystore_container.keystore(),
574					client: client.clone(),
575					additional_digests_provider: maybe_provide_vrf_digest,
576					_phantom: Default::default(),
577				})),
578				create_inherent_data_providers: move |block: H256, ()| {
579					let maybe_current_para_block = client_for_cidp.number(block);
580					let maybe_current_para_head = client_for_cidp.expect_header(block);
581					let downward_xcm_receiver = downward_xcm_receiver.clone();
582					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
583					let additional_relay_offset = additional_relay_offset.clone();
584
585					// Need to clone it and store here to avoid moving of `client`
586					// variable in closure below.
587					let client_for_xcm = client_for_cidp.clone();
588
589					async move {
590						MockTimestampInherentDataProvider::advance_timestamp(
591							RELAY_CHAIN_SLOT_DURATION_MILLIS,
592						);
593
594						// Get the mocked timestamp
595						let timestamp = TIMESTAMP.load(Ordering::SeqCst);
596						// Calculate mocked slot number
597						let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
598
599						let current_para_block = maybe_current_para_block?
600							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
601
602						let current_para_block_head = Some(polkadot_primitives::HeadData(
603							maybe_current_para_head?.encode(),
604						));
605
606						let additional_key_values = vec![
607							(
608								// TIMESTAMP_NOW was deprecated in runtime 4000, but should
609								// be kept for backwards compatibility with old runtimes
610								pallet_timestamp::Now::<moonbeam_runtime::Runtime>::hashed_key()
611									.to_vec(),
612								timestamp.encode(),
613							),
614							// Override current slot number
615							(
616								relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
617								Slot::from(slot).encode(),
618							),
619							(
620								relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
621								AbridgedHostConfiguration {
622									max_code_size: 3_145_728,
623									max_head_data_size: 20_480,
624									max_upward_queue_count: 174_762,
625									max_upward_queue_size: 1_048_576,
626									max_upward_message_size: 65_531,
627									max_upward_message_num_per_candidate: 16,
628									hrmp_max_message_num_per_candidate: 10,
629									validation_upgrade_cooldown: 14_400,
630									validation_upgrade_delay: 600,
631									async_backing_params: AsyncBackingParams {
632										max_candidate_depth: 3,
633										allowed_ancestry_len: 2,
634									},
635								}
636								.encode(),
637							),
638						];
639
640						let current_para_head = client_for_xcm
641							.header(block)
642							.expect("Header lookup should succeed")
643							.expect("Header passed in as parent should be present in backend.");
644
645						let should_send_go_ahead = match client_for_xcm
646							.runtime_api()
647							.collect_collation_info(block, &current_para_head)
648						{
649							Ok(info) => info.new_validation_code.is_some(),
650							Err(e) => {
651								log::error!("Failed to collect collation info: {:?}", e);
652								false
653							}
654						};
655
656						let mocked_parachain = MockValidationDataInherentDataProvider {
657							current_para_block,
658							para_id: parachain_id,
659							upgrade_go_ahead: should_send_go_ahead.then(|| {
660								log::info!(
661									"Detected pending validation code, sending go-ahead signal."
662								);
663								UpgradeGoAhead::GoAhead
664							}),
665							current_para_block_head,
666							relay_offset: additional_relay_offset.load(Ordering::SeqCst),
667							relay_blocks_per_para_block: 1,
668							para_blocks_per_relay_epoch: 10,
669							relay_randomness_config: (),
670							xcm_config: MockXcmConfig::new(
671								&*client_for_xcm,
672								block,
673								Default::default(),
674							),
675							raw_downward_messages: downward_xcm_receiver.drain().collect(),
676							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
677							additional_key_values: Some(additional_key_values),
678						};
679
680						let randomness = session_keys_primitives::InherentDataProvider;
681
682						Ok((
683							MockTimestampInherentDataProvider,
684							mocked_parachain,
685							randomness,
686						))
687					}
688				},
689			}),
690		);
691	}
692
693	// Sinks for pubsub notifications.
694	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
695	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
696	// notification to the subscriber on receiving a message through this channel.
697	// This way we avoid race conditions when using native substrate block import notification
698	// stream.
699	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
700		fc_mapping_sync::EthereumBlockNotification<Block>,
701	> = Default::default();
702	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
703
704	/* TODO: only enable this when frontier backend is compatible with lazy-loading
705	rpc::spawn_essential_tasks(
706		rpc::SpawnTasksParams {
707			task_manager: &task_manager,
708			client: client.clone(),
709			substrate_backend: backend.clone(),
710			frontier_backend: frontier_backend.clone(),
711			filter_pool: filter_pool.clone(),
712			overrides: overrides.clone(),
713			fee_history_limit,
714			fee_history_cache: fee_history_cache.clone(),
715		},
716		sync_service.clone(),
717		pubsub_notification_sinks.clone(),
718	);
719	*/
720
721	let ethapi_cmd = rpc_config.ethapi.clone();
722	let tracing_requesters =
723		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
724			rpc::tracing::spawn_tracing_tasks(
725				&rpc_config,
726				prometheus_registry.clone(),
727				rpc::SpawnTasksParams {
728					task_manager: &task_manager,
729					client: client.clone(),
730					substrate_backend: backend.clone(),
731					frontier_backend: frontier_backend.clone(),
732					filter_pool: filter_pool.clone(),
733					overrides: overrides.clone(),
734					fee_history_limit,
735					fee_history_cache: fee_history_cache.clone(),
736				},
737			)
738		} else {
739			rpc::tracing::RpcRequesters {
740				debug: None,
741				trace: None,
742			}
743		};
744
745	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
746		task_manager.spawn_handle(),
747		overrides.clone(),
748		rpc_config.eth_log_block_cache,
749		rpc_config.eth_statuses_cache,
750		prometheus_registry,
751	));
752
753	let rpc_builder = {
754		let client = client.clone();
755		let pool = transaction_pool.clone();
756		let backend = backend.clone();
757		let network = network.clone();
758		let sync = sync_service.clone();
759		let ethapi_cmd = ethapi_cmd.clone();
760		let max_past_logs = rpc_config.max_past_logs;
761		let max_block_range = rpc_config.max_block_range;
762		let overrides = overrides.clone();
763		let fee_history_cache = fee_history_cache.clone();
764		let block_data_cache = block_data_cache.clone();
765		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
766
767		let keystore = keystore_container.keystore();
768		let command_sink_for_task = command_sink.clone();
769		move |subscription_task_executor| {
770			let deps = rpc::FullDeps {
771				backend: backend.clone(),
772				client: client.clone(),
773				command_sink: command_sink_for_task.clone(),
774				ethapi_cmd: ethapi_cmd.clone(),
775				filter_pool: filter_pool.clone(),
776				frontier_backend: Arc::new(LazyLoadingFrontierBackend {
777					rpc_client: backend.clone().rpc_client.clone(),
778					frontier_backend: match *frontier_backend {
779						fc_db::Backend::KeyValue(ref b) => b.clone(),
780						fc_db::Backend::Sql(ref b) => b.clone(),
781					},
782				}),
783				graph: pool.clone(),
784				pool: pool.clone(),
785				is_authority: collator,
786				max_past_logs,
787				max_block_range,
788				fee_history_limit,
789				fee_history_cache: fee_history_cache.clone(),
790				network: network.clone(),
791				sync: sync.clone(),
792				dev_rpc_data: dev_rpc_data.clone(),
793				overrides: overrides.clone(),
794				block_data_cache: block_data_cache.clone(),
795				forced_parent_hashes: None,
796			};
797
798			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
799				client.clone(),
800				keystore.clone(),
801			));
802			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
803				rpc::create_full(
804					deps,
805					subscription_task_executor,
806					Some(crate::rpc::TracingConfig {
807						tracing_requesters: tracing_requesters.clone(),
808						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
809					}),
810					pubsub_notification_sinks.clone(),
811					pending_consensus_data_provider,
812					parachain_id,
813				)
814				.map_err(Into::into)
815			} else {
816				rpc::create_full(
817					deps,
818					subscription_task_executor,
819					None,
820					pubsub_notification_sinks.clone(),
821					pending_consensus_data_provider,
822					parachain_id,
823				)
824				.map_err(Into::into)
825			}
826		}
827	};
828
829	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
830		network,
831		client,
832		keystore: keystore_container.keystore(),
833		task_manager: &mut task_manager,
834		transaction_pool,
835		rpc_builder: Box::new(rpc_builder),
836		backend,
837		system_rpc_tx,
838		sync_service: sync_service.clone(),
839		config,
840		tx_handler_controller,
841		telemetry: None,
842	})?;
843
844	if let Some(hwbench) = hwbench {
845		sc_sysinfo::print_hwbench(&hwbench);
846
847		if let Some(ref mut telemetry) = telemetry {
848			let telemetry_handle = telemetry.handle();
849			task_manager.spawn_handle().spawn(
850				"telemetry_hwbench",
851				None,
852				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
853			);
854		}
855	}
856
857	log::info!("Service Ready");
858
859	Ok(task_manager)
860}
861
862pub fn spec_builder() -> sc_chain_spec::ChainSpecBuilder<Extensions, HostFunctions> {
863	crate::chain_spec::moonbeam::ChainSpec::builder(
864		moonbeam_runtime::WASM_BINARY.expect("WASM binary was not build, please build it!"),
865		Default::default(),
866	)
867	.with_name("Lazy Loading")
868	.with_id("lazy_loading")
869	.with_chain_type(ChainType::Development)
870	.with_properties(
871		serde_json::from_str(
872			"{\"tokenDecimals\": 18, \"tokenSymbol\": \"GLMR\", \"SS58Prefix\": 1284}",
873		)
874		.expect("Provided valid json map"),
875	)
876	.with_genesis_config_preset_name(sp_genesis_builder::DEV_RUNTIME_PRESET)
877}