1use crate::chain_spec::Extensions;
18use crate::{
19 lazy_loading, open_frontier_backend, rpc, set_prometheus_registry, ClientCustomizations,
20 FrontierBlockImport, HostFunctions, MockTimestampInherentDataProvider, MoonbeamBlockImport,
21 PartialComponentsResult, PendingConsensusDataProvider, RuntimeApiCollection,
22 RELAY_CHAIN_SLOT_DURATION_MILLIS, SOFT_DEADLINE_PERCENT, TIMESTAMP,
23};
24use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
25use cumulus_primitives_core::{relay_chain, BlockT, CollectCollationInfo, ParaId};
26use fc_rpc::StorageOverrideHandler;
27use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
28use frontier_backend::LazyLoadingFrontierBackend;
29use futures::{FutureExt, StreamExt};
30use moonbeam_cli_opt::{EthApi as EthApiCmd, LazyLoadingConfig, RpcConfig};
31use moonbeam_core_primitives::{Block, Hash};
32use nimbus_consensus::NimbusManualSealConsensusDataProvider;
33use nimbus_primitives::NimbusId;
34use parity_scale_codec::Encode;
35use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
36use sc_chain_spec::{get_extension, BuildGenesisBlock, ChainType, GenesisBlockBuilder};
37use sc_client_api::{Backend, BadBlocks, ExecutorProvider, ForkBlocks};
38use sc_executor::{HeapAllocStrategy, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
39use sc_network::config::FullNetworkConfiguration;
40use sc_network::NetworkBackend;
41use sc_network_common::sync::SyncMode;
42use sc_service::{
43 error::Error as ServiceError, ClientConfig, Configuration, Error, KeystoreContainer,
44 LocalCallExecutor, PartialComponents, TaskManager,
45};
46use sc_telemetry::{TelemetryHandle, TelemetryWorker};
47use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
48use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
49use sp_blockchain::HeaderBackend;
50use sp_core::traits::CodeExecutor;
51use sp_core::H256;
52use sp_runtime::traits::NumberFor;
53use std::collections::BTreeMap;
54use std::str::FromStr;
55use std::sync::atomic::Ordering;
56use std::sync::{Arc, Mutex};
57use std::time::Duration;
58
59pub mod call_executor;
60mod client;
61pub mod frontier_backend;
62mod helpers;
63mod lock;
64mod manual_sealing;
65mod rpc_client;
66mod state_overrides;
67pub mod substrate_backend;
68
69pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
70
71pub type TLazyLoadingClient<TBl, TRtApi, TExec> = sc_service::client::Client<
73 TLazyLoadingBackend<TBl>,
74 TLazyLoadingCallExecutor<TBl, TExec>,
75 TBl,
76 TRtApi,
77>;
78
79pub type TLazyLoadingBackend<TBl> = substrate_backend::Backend<TBl>;
81
82pub type TLazyLoadingCallExecutor<TBl, TExec> = call_executor::LazyLoadingCallExecutor<
84 TBl,
85 LocalCallExecutor<TBl, TLazyLoadingBackend<TBl>, TExec>,
86>;
87
88pub type TLazyLoadingParts<TBl, TRtApi, TExec> = (
90 TLazyLoadingClient<TBl, TRtApi, TExec>,
91 Arc<TLazyLoadingBackend<TBl>>,
92 KeystoreContainer,
93 TaskManager,
94);
95
96type LazyLoadingClient<RuntimeApi> =
97 TLazyLoadingClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98type LazyLoadingBackend = TLazyLoadingBackend<Block>;
99
100pub fn new_lazy_loading_parts<TBl, TRtApi, TExec>(
102 config: &mut Configuration,
103 lazy_loading_config: &LazyLoadingConfig,
104 telemetry: Option<TelemetryHandle>,
105 executor: TExec,
106) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
107where
108 TBl: BlockT + sp_runtime::DeserializeOwned,
109 TBl::Hash: From<H256>,
110 TExec: CodeExecutor + RuntimeVersionOf + Clone,
111{
112 let backend = substrate_backend::new_backend(config, &lazy_loading_config)?;
113
114 let genesis_block_builder = GenesisBlockBuilder::new(
115 config.chain_spec.as_storage_builder(),
116 !config.no_genesis(),
117 backend.clone(),
118 executor.clone(),
119 )?;
120
121 new_lazy_loading_parts_with_genesis_builder(
122 config,
123 telemetry,
124 executor,
125 backend,
126 genesis_block_builder,
127 )
128}
129
130pub fn new_lazy_loading_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
132 config: &Configuration,
133 telemetry: Option<TelemetryHandle>,
134 executor: TExec,
135 backend: Arc<TLazyLoadingBackend<TBl>>,
136 genesis_block_builder: TBuildGenesisBlock,
137) -> Result<TLazyLoadingParts<TBl, TRtApi, TExec>, Error>
138where
139 TBl: BlockT + sp_runtime::DeserializeOwned,
140 TBl::Hash: From<H256>,
141 TExec: CodeExecutor + RuntimeVersionOf + Clone,
142 TBuildGenesisBlock:
143 BuildGenesisBlock<
144 TBl,
145 BlockImportOperation = <TLazyLoadingBackend<TBl> as sc_client_api::backend::Backend<
146 TBl,
147 >>::BlockImportOperation,
148 >,
149{
150 let keystore_container = KeystoreContainer::new(&config.keystore)?;
151
152 let task_manager = {
153 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
154 TaskManager::new(config.tokio_handle.clone(), registry)?
155 };
156
157 let chain_spec = &config.chain_spec;
158 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
159 .cloned()
160 .unwrap_or_default();
161
162 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
163 .cloned()
164 .unwrap_or_default();
165
166 let client = {
167 let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
168 None,
169 Arc::new(executor.clone()),
170 );
171
172 let wasm_runtime_substitutes = config
173 .chain_spec
174 .code_substitutes()
175 .into_iter()
176 .map(|(n, c)| {
177 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
178 Error::Application(Box::from(format!(
179 "Failed to parse `{}` as block number for code substitutes. \
180 In an old version the key for code substitute was a block hash. \
181 Please update the chain spec to a version that is compatible with your node.",
182 n
183 )))
184 })?;
185 Ok((number, c))
186 })
187 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
188
189 let client = client::new_client(
190 backend.clone(),
191 executor,
192 genesis_block_builder,
193 fork_blocks,
194 bad_blocks,
195 extensions,
196 Box::new(task_manager.spawn_handle()),
197 config
198 .prometheus_config
199 .as_ref()
200 .map(|config| config.registry.clone()),
201 telemetry,
202 ClientConfig {
203 offchain_worker_enabled: config.offchain_worker.enabled,
204 offchain_indexing_api: config.offchain_worker.indexing_enabled,
205 wasmtime_precompiled: config.executor.wasmtime_precompiled.clone(),
206 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
207 no_genesis: matches!(
208 config.network.sync_mode,
209 SyncMode::LightState { .. } | SyncMode::Warp { .. }
210 ),
211 wasm_runtime_substitutes,
212 enable_import_proof_recording: true,
213 },
214 )?;
215
216 client
217 };
218
219 Ok((client, backend, keystore_container, task_manager))
220}
221
222#[allow(clippy::type_complexity)]
224pub fn new_lazy_loading_partial<RuntimeApi, Customizations>(
225 config: &mut Configuration,
226 rpc_config: &RpcConfig,
227 lazy_loading_config: &LazyLoadingConfig,
228) -> PartialComponentsResult<LazyLoadingClient<RuntimeApi>, LazyLoadingBackend>
229where
230 RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
231 RuntimeApi::RuntimeApi: RuntimeApiCollection,
232 Customizations: ClientCustomizations + 'static,
233{
234 set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
235
236 config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
238
239 let telemetry = config
240 .telemetry_endpoints
241 .clone()
242 .filter(|x| !x.is_empty())
243 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
244 let worker = TelemetryWorker::new(16)?;
245 let telemetry = worker.handle().new_telemetry(endpoints);
246 Ok((worker, telemetry))
247 })
248 .transpose()?;
249
250 let heap_pages = config
251 .executor
252 .default_heap_pages
253 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
254 extra_pages: h as _,
255 });
256 let mut wasm_builder = WasmExecutor::builder()
257 .with_execution_method(config.executor.wasm_method)
258 .with_onchain_heap_alloc_strategy(heap_pages)
259 .with_offchain_heap_alloc_strategy(heap_pages)
260 .with_ignore_onchain_heap_pages(true)
261 .with_max_runtime_instances(config.executor.max_runtime_instances)
262 .with_runtime_cache_size(config.executor.runtime_cache_size);
263
264 if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
265 wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
266 }
267
268 let executor = wasm_builder.build();
269
270 let (client, backend, keystore_container, task_manager) =
271 new_lazy_loading_parts::<Block, RuntimeApi, _>(
272 config,
273 lazy_loading_config,
274 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
275 executor,
276 )?;
277
278 if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
279 client
280 .execution_extensions()
281 .set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
282 Block,
283 sp_io::UseDalekExt,
284 >::new(block_number));
285 }
286
287 let client = Arc::new(client);
288
289 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
290
291 let telemetry = telemetry.map(|(worker, telemetry)| {
292 task_manager
293 .spawn_handle()
294 .spawn("telemetry", None, worker.run());
295 telemetry
296 });
297
298 let maybe_select_chain = Some(sc_consensus::LongestChain::new(backend.clone()));
299
300 let transaction_pool = sc_transaction_pool::Builder::new(
301 task_manager.spawn_essential_handle(),
302 client.clone(),
303 config.role.is_authority().into(),
304 )
305 .with_options(config.transaction_pool.clone())
306 .with_prometheus(config.prometheus_registry())
307 .build();
308
309 let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
310 let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
311
312 let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
313 let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
314
315 let create_inherent_data_providers = move |_, _| async move { Ok(()) };
316
317 let import_queue = nimbus_consensus::import_queue(
318 client.clone(),
319 frontier_block_import.clone(),
320 create_inherent_data_providers,
321 &task_manager.spawn_essential_handle(),
322 config.prometheus_registry(),
323 false,
324 false,
325 )?;
326 let block_import = MoonbeamBlockImport::Dev(frontier_block_import);
327
328 Ok(PartialComponents {
329 backend,
330 client,
331 import_queue,
332 keystore_container,
333 task_manager,
334 transaction_pool: transaction_pool.into(),
335 select_chain: maybe_select_chain,
336 other: (
337 block_import,
338 filter_pool,
339 telemetry,
340 telemetry_worker_handle,
341 frontier_backend,
342 fee_history_cache,
343 ),
344 })
345}
346
347#[sc_tracing::logging::prefix_logs_with("Lazy loading 🌗")]
350pub async fn new_lazy_loading_service<RuntimeApi, Customizations, Net>(
351 mut config: Configuration,
352 _author_id: Option<NimbusId>,
353 sealing: moonbeam_cli_opt::Sealing,
354 rpc_config: RpcConfig,
355 lazy_loading_config: LazyLoadingConfig,
356 hwbench: Option<sc_sysinfo::HwBench>,
357) -> Result<TaskManager, ServiceError>
358where
359 RuntimeApi: ConstructRuntimeApi<Block, LazyLoadingClient<RuntimeApi>> + Send + Sync + 'static,
360 RuntimeApi::RuntimeApi: RuntimeApiCollection,
361 Customizations: ClientCustomizations + 'static,
362 Net: NetworkBackend<Block, Hash>,
363{
364 use async_io::Timer;
365 use futures::Stream;
366 use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};
367
368 let sc_service::PartialComponents {
369 client,
370 backend,
371 mut task_manager,
372 import_queue,
373 keystore_container,
374 select_chain: maybe_select_chain,
375 transaction_pool,
376 other:
377 (
378 block_import_pipeline,
379 filter_pool,
380 mut telemetry,
381 _telemetry_worker_handle,
382 frontier_backend,
383 fee_history_cache,
384 ),
385 } = lazy_loading::new_lazy_loading_partial::<RuntimeApi, Customizations>(
386 &mut config,
387 &rpc_config,
388 &lazy_loading_config,
389 )?;
390
391 let start_delay = 10;
392 let lazy_loading_startup_disclaimer = format!(
393 r#"
394
395 You are now running the Moonbeam client in lazy loading mode, where data is retrieved
396 from a live RPC node on demand.
397
398 Using remote state from: {rpc}
399 Forking from block: {fork_block}
400
401 To ensure the client works properly, please note the following:
402
403 1. *Avoid Throttling*: Ensure that the backing RPC node is not limiting the number of
404 requests, as this can prevent the lazy loading client from functioning correctly;
405
406 2. *Be Patient*: As the client may take approximately 20 times longer than normal to
407 retrieve and process the necessary data for the requested operation.
408
409
410 The service will start in {start_delay} seconds...
411
412 "#,
413 rpc = lazy_loading_config.state_rpc,
414 fork_block = backend.fork_checkpoint.number
415 );
416
417 log::warn!(
418 "{}",
419 ansi_term::Colour::Yellow.paint(lazy_loading_startup_disclaimer)
420 );
421 tokio::time::sleep(Duration::from_secs(start_delay)).await;
422
423 let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
424 block_import
425 } else {
426 return Err(ServiceError::Other(
427 "Block import pipeline is not dev".to_string(),
428 ));
429 };
430
431 let prometheus_registry = config.prometheus_registry().cloned();
432 let net_config =
433 FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
434
435 let metrics = Net::register_notification_metrics(
436 config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
437 );
438
439 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
440 sc_service::build_network(sc_service::BuildNetworkParams {
441 config: &config,
442 client: client.clone(),
443 transaction_pool: transaction_pool.clone(),
444 spawn_handle: task_manager.spawn_handle(),
445 import_queue,
446 block_announce_validator_builder: None,
447 warp_sync_config: None,
448 net_config,
449 block_relay: None,
450 metrics,
451 })?;
452
453 if config.offchain_worker.enabled {
454 task_manager.spawn_handle().spawn(
455 "offchain-workers-runner",
456 "offchain-work",
457 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
458 runtime_api_provider: client.clone(),
459 keystore: Some(keystore_container.keystore()),
460 offchain_db: backend.offchain_storage(),
461 transaction_pool: Some(OffchainTransactionPoolFactory::new(
462 transaction_pool.clone(),
463 )),
464 network_provider: Arc::new(network.clone()),
465 is_validator: config.role.is_authority(),
466 enable_http_requests: true,
467 custom_extensions: move |_| vec![],
468 })?
469 .run(client.clone(), task_manager.spawn_handle())
470 .boxed(),
471 );
472 }
473
474 let prometheus_registry = config.prometheus_registry().cloned();
475 let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
476 let fee_history_limit = rpc_config.fee_history_limit;
477 let mut command_sink = None;
478 let mut dev_rpc_data = None;
479 let collator = config.role.is_authority();
480
481 let parachain_id: ParaId = helpers::get_parachain_id(backend.rpc_client.clone())
482 .unwrap_or_else(|| panic!("Could not get parachain identifier for lazy loading mode."))
483 .into();
484
485 if collator {
486 let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
487 task_manager.spawn_handle(),
488 client.clone(),
489 transaction_pool.clone(),
490 prometheus_registry.as_ref(),
491 telemetry.as_ref().map(|x| x.handle()),
492 );
493 env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
494
495 let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
496 match sealing {
497 moonbeam_cli_opt::Sealing::Instant => {
498 Box::new(
499 transaction_pool.import_notification_stream().map(|_| {
501 EngineCommand::SealNewBlock {
502 create_empty: false,
503 finalize: false,
504 parent_hash: None,
505 sender: None,
506 }
507 }),
508 )
509 }
510 moonbeam_cli_opt::Sealing::Manual => {
511 let (sink, stream) = futures::channel::mpsc::channel(1000);
512 command_sink = Some(sink);
514 Box::new(stream)
515 }
516 moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
517 Timer::interval(Duration::from_millis(millis)),
518 |_| EngineCommand::SealNewBlock {
519 create_empty: true,
520 finalize: false,
521 parent_hash: None,
522 sender: None,
523 },
524 )),
525 };
526
527 let select_chain = maybe_select_chain.expect(
528 "`new_lazy_loading_partial` builds a `LongestChainRule` when building dev service.\
529 We specified the dev service when calling `new_partial`.\
530 Therefore, a `LongestChainRule` is present. qed.",
531 );
532
533 let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
535 let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
536 let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
537 dev_rpc_data = Some((
538 downward_xcm_sender,
539 hrmp_xcm_sender,
540 additional_relay_offset.clone(),
541 ));
542
543 let client_vrf = client.clone();
546
547 let keystore_clone = keystore_container.keystore().clone();
548 let maybe_provide_vrf_digest =
549 move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
550 moonbeam_vrf::vrf_pre_digest::<Block, LazyLoadingClient<RuntimeApi>>(
551 &client_vrf,
552 &keystore_clone,
553 nimbus_id,
554 parent,
555 )
556 };
557
558 let client_for_cidp = client.clone();
561
562 task_manager.spawn_essential_handle().spawn_blocking(
563 "authorship_task",
564 Some("block-authoring"),
565 manual_sealing::run_manual_seal(ManualSealParams {
566 block_import,
567 env,
568 client: client.clone(),
569 pool: transaction_pool.clone(),
570 commands_stream,
571 select_chain,
572 consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
573 keystore: keystore_container.keystore(),
574 client: client.clone(),
575 additional_digests_provider: maybe_provide_vrf_digest,
576 _phantom: Default::default(),
577 })),
578 create_inherent_data_providers: move |block: H256, ()| {
579 let maybe_current_para_block = client_for_cidp.number(block);
580 let maybe_current_para_head = client_for_cidp.expect_header(block);
581 let downward_xcm_receiver = downward_xcm_receiver.clone();
582 let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
583 let additional_relay_offset = additional_relay_offset.clone();
584
585 let client_for_xcm = client_for_cidp.clone();
588
589 async move {
590 MockTimestampInherentDataProvider::advance_timestamp(
591 RELAY_CHAIN_SLOT_DURATION_MILLIS,
592 );
593
594 let timestamp = TIMESTAMP.load(Ordering::SeqCst);
596 let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
598
599 let current_para_block = maybe_current_para_block?
600 .ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
601
602 let current_para_block_head = Some(polkadot_primitives::HeadData(
603 maybe_current_para_head?.encode(),
604 ));
605
606 let additional_key_values = vec![
607 (
608 pallet_timestamp::Now::<moonbeam_runtime::Runtime>::hashed_key()
611 .to_vec(),
612 timestamp.encode(),
613 ),
614 (
616 relay_chain::well_known_keys::CURRENT_SLOT.to_vec(),
617 Slot::from(slot).encode(),
618 ),
619 (
620 relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
621 AbridgedHostConfiguration {
622 max_code_size: 3_145_728,
623 max_head_data_size: 20_480,
624 max_upward_queue_count: 174_762,
625 max_upward_queue_size: 1_048_576,
626 max_upward_message_size: 65_531,
627 max_upward_message_num_per_candidate: 16,
628 hrmp_max_message_num_per_candidate: 10,
629 validation_upgrade_cooldown: 14_400,
630 validation_upgrade_delay: 600,
631 async_backing_params: AsyncBackingParams {
632 max_candidate_depth: 3,
633 allowed_ancestry_len: 2,
634 },
635 }
636 .encode(),
637 ),
638 ];
639
640 let current_para_head = client_for_xcm
641 .header(block)
642 .expect("Header lookup should succeed")
643 .expect("Header passed in as parent should be present in backend.");
644
645 let should_send_go_ahead = match client_for_xcm
646 .runtime_api()
647 .collect_collation_info(block, ¤t_para_head)
648 {
649 Ok(info) => info.new_validation_code.is_some(),
650 Err(e) => {
651 log::error!("Failed to collect collation info: {:?}", e);
652 false
653 }
654 };
655
656 let mocked_parachain = MockValidationDataInherentDataProvider {
657 current_para_block,
658 para_id: parachain_id,
659 upgrade_go_ahead: should_send_go_ahead.then(|| {
660 log::info!(
661 "Detected pending validation code, sending go-ahead signal."
662 );
663 UpgradeGoAhead::GoAhead
664 }),
665 current_para_block_head,
666 relay_offset: additional_relay_offset.load(Ordering::SeqCst),
667 relay_blocks_per_para_block: 1,
668 para_blocks_per_relay_epoch: 10,
669 relay_randomness_config: (),
670 xcm_config: MockXcmConfig::new(
671 &*client_for_xcm,
672 block,
673 Default::default(),
674 ),
675 raw_downward_messages: downward_xcm_receiver.drain().collect(),
676 raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
677 additional_key_values: Some(additional_key_values),
678 };
679
680 let randomness = session_keys_primitives::InherentDataProvider;
681
682 Ok((
683 MockTimestampInherentDataProvider,
684 mocked_parachain,
685 randomness,
686 ))
687 }
688 },
689 }),
690 );
691 }
692
693 let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
700 fc_mapping_sync::EthereumBlockNotification<Block>,
701 > = Default::default();
702 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
703
704 let ethapi_cmd = rpc_config.ethapi.clone();
722 let tracing_requesters =
723 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
724 rpc::tracing::spawn_tracing_tasks(
725 &rpc_config,
726 prometheus_registry.clone(),
727 rpc::SpawnTasksParams {
728 task_manager: &task_manager,
729 client: client.clone(),
730 substrate_backend: backend.clone(),
731 frontier_backend: frontier_backend.clone(),
732 filter_pool: filter_pool.clone(),
733 overrides: overrides.clone(),
734 fee_history_limit,
735 fee_history_cache: fee_history_cache.clone(),
736 },
737 )
738 } else {
739 rpc::tracing::RpcRequesters {
740 debug: None,
741 trace: None,
742 }
743 };
744
745 let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
746 task_manager.spawn_handle(),
747 overrides.clone(),
748 rpc_config.eth_log_block_cache,
749 rpc_config.eth_statuses_cache,
750 prometheus_registry,
751 ));
752
753 let rpc_builder = {
754 let client = client.clone();
755 let pool = transaction_pool.clone();
756 let backend = backend.clone();
757 let network = network.clone();
758 let sync = sync_service.clone();
759 let ethapi_cmd = ethapi_cmd.clone();
760 let max_past_logs = rpc_config.max_past_logs;
761 let max_block_range = rpc_config.max_block_range;
762 let overrides = overrides.clone();
763 let fee_history_cache = fee_history_cache.clone();
764 let block_data_cache = block_data_cache.clone();
765 let pubsub_notification_sinks = pubsub_notification_sinks.clone();
766
767 let keystore = keystore_container.keystore();
768 let command_sink_for_task = command_sink.clone();
769 move |subscription_task_executor| {
770 let deps = rpc::FullDeps {
771 backend: backend.clone(),
772 client: client.clone(),
773 command_sink: command_sink_for_task.clone(),
774 ethapi_cmd: ethapi_cmd.clone(),
775 filter_pool: filter_pool.clone(),
776 frontier_backend: Arc::new(LazyLoadingFrontierBackend {
777 rpc_client: backend.clone().rpc_client.clone(),
778 frontier_backend: match *frontier_backend {
779 fc_db::Backend::KeyValue(ref b) => b.clone(),
780 fc_db::Backend::Sql(ref b) => b.clone(),
781 },
782 }),
783 graph: pool.clone(),
784 pool: pool.clone(),
785 is_authority: collator,
786 max_past_logs,
787 max_block_range,
788 fee_history_limit,
789 fee_history_cache: fee_history_cache.clone(),
790 network: network.clone(),
791 sync: sync.clone(),
792 dev_rpc_data: dev_rpc_data.clone(),
793 overrides: overrides.clone(),
794 block_data_cache: block_data_cache.clone(),
795 forced_parent_hashes: None,
796 };
797
798 let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
799 client.clone(),
800 keystore.clone(),
801 ));
802 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
803 rpc::create_full(
804 deps,
805 subscription_task_executor,
806 Some(crate::rpc::TracingConfig {
807 tracing_requesters: tracing_requesters.clone(),
808 trace_filter_max_count: rpc_config.ethapi_trace_max_count,
809 }),
810 pubsub_notification_sinks.clone(),
811 pending_consensus_data_provider,
812 parachain_id,
813 )
814 .map_err(Into::into)
815 } else {
816 rpc::create_full(
817 deps,
818 subscription_task_executor,
819 None,
820 pubsub_notification_sinks.clone(),
821 pending_consensus_data_provider,
822 parachain_id,
823 )
824 .map_err(Into::into)
825 }
826 }
827 };
828
829 let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
830 network,
831 client,
832 keystore: keystore_container.keystore(),
833 task_manager: &mut task_manager,
834 transaction_pool,
835 rpc_builder: Box::new(rpc_builder),
836 backend,
837 system_rpc_tx,
838 sync_service: sync_service.clone(),
839 config,
840 tx_handler_controller,
841 telemetry: None,
842 })?;
843
844 if let Some(hwbench) = hwbench {
845 sc_sysinfo::print_hwbench(&hwbench);
846
847 if let Some(ref mut telemetry) = telemetry {
848 let telemetry_handle = telemetry.handle();
849 task_manager.spawn_handle().spawn(
850 "telemetry_hwbench",
851 None,
852 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
853 );
854 }
855 }
856
857 log::info!("Service Ready");
858
859 Ok(task_manager)
860}
861
862pub fn spec_builder() -> sc_chain_spec::ChainSpecBuilder<Extensions, HostFunctions> {
863 crate::chain_spec::moonbeam::ChainSpec::builder(
864 moonbeam_runtime::WASM_BINARY.expect("WASM binary was not build, please build it!"),
865 Default::default(),
866 )
867 .with_name("Lazy Loading")
868 .with_id("lazy_loading")
869 .with_chain_type(ChainType::Development)
870 .with_properties(
871 serde_json::from_str(
872 "{\"tokenDecimals\": 18, \"tokenSymbol\": \"GLMR\", \"SS58Prefix\": 1284}",
873 )
874 .expect("Provided valid json map"),
875 )
876 .with_genesis_config_preset_name(sp_genesis_builder::DEV_RUNTIME_PRESET)
877}