1pub mod rpc;
26
27use cumulus_client_cli::CollatorOptions;
28use cumulus_client_collator::service::CollatorService;
29use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30use cumulus_client_consensus_proposer::Proposer;
31use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
32use cumulus_client_service::{
33 prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
34 ParachainHostFunctions, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{
37 relay_chain::{self, well_known_keys, CollatorPair},
38 CollectCollationInfo, ParaId,
39};
40use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
41use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
42use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
43use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
44use fc_db::DatabaseSource;
45use fc_rpc::StorageOverrideHandler;
46use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
47use futures::{FutureExt, StreamExt};
48use maplit::hashmap;
49#[cfg(feature = "moonbase-native")]
50pub use moonbase_runtime;
51use moonbeam_cli_opt::{
52 AuthoringPolicy, EthApi as EthApiCmd, FrontierBackendConfig, NodeExtraArgs, RpcConfig,
53};
54#[cfg(feature = "moonbeam-native")]
55pub use moonbeam_runtime;
56use moonbeam_vrf::VrfDigestsProvider;
57#[cfg(feature = "moonriver-native")]
58pub use moonriver_runtime;
59use nimbus_consensus::collators::slot_based::SlotBasedBlockImportHandle;
60use nimbus_consensus::{
61 collators::slot_based::SlotBasedBlockImport, NimbusManualSealConsensusDataProvider,
62};
63use nimbus_primitives::{DigestsProvider, NimbusId};
64use polkadot_primitives::{AbridgedHostConfiguration, AsyncBackingParams, Slot, UpgradeGoAhead};
65use sc_client_api::{
66 backend::{AuxStore, Backend, StateBackend, StorageProvider},
67 ExecutorProvider,
68};
69use sc_consensus::{BlockImport, ImportQueue};
70use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
71use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
72use sc_service::config::PrometheusConfig;
73use sc_service::{
74 error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
75 TFullClient, TaskManager,
76};
77use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
78use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
79use session_keys_primitives::VrfApi;
80use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
81use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
82use sp_consensus::SyncOracle;
83use sp_core::{ByteArray, Encode, H256};
84use sp_keystore::{Keystore, KeystorePtr};
85use std::str::FromStr;
86use std::sync::atomic::{AtomicU64, Ordering};
87use std::sync::Arc;
88use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
89use substrate_prometheus_endpoint::Registry;
90
91pub use client::*;
92pub mod chain_spec;
93mod client;
94#[cfg(feature = "lazy-loading")]
95pub mod lazy_loading;
96
97type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
98type FullBackend = TFullBackend<Block>;
99
100type MaybeSelectChain<Backend> = Option<sc_consensus::LongestChain<Backend, Block>>;
101type FrontierBlockImport<Client> = TFrontierBlockImport<Block, Arc<Client>, Client>;
102type PartialComponentsResult<Client, Backend> = Result<
103 PartialComponents<
104 Client,
105 Backend,
106 MaybeSelectChain<Backend>,
107 sc_consensus::DefaultImportQueue<Block>,
108 sc_transaction_pool::TransactionPoolHandle<Block, Client>,
109 (
110 MoonbeamBlockImport<Client>,
111 Option<FilterPool>,
112 Option<Telemetry>,
113 Option<TelemetryWorkerHandle>,
114 Arc<fc_db::Backend<Block, Client>>,
115 FeeHistoryCache,
116 ),
117 >,
118 ServiceError,
119>;
120
121const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
122
123static TIMESTAMP: AtomicU64 = AtomicU64::new(0);
124
125struct MockTimestampInherentDataProvider;
128
129impl MockTimestampInherentDataProvider {
130 fn advance_timestamp(slot_duration: u64) {
131 if TIMESTAMP.load(Ordering::SeqCst) == 0 {
132 TIMESTAMP.store(
134 sp_timestamp::Timestamp::current().as_millis(),
135 Ordering::SeqCst,
136 );
137 } else {
138 TIMESTAMP.fetch_add(slot_duration, Ordering::SeqCst);
139 }
140 }
141}
142
143#[async_trait::async_trait]
144impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
145 async fn provide_inherent_data(
146 &self,
147 inherent_data: &mut sp_inherents::InherentData,
148 ) -> Result<(), sp_inherents::Error> {
149 inherent_data.put_data(
150 sp_timestamp::INHERENT_IDENTIFIER,
151 &TIMESTAMP.load(Ordering::SeqCst),
152 )
153 }
154
155 async fn try_handle_error(
156 &self,
157 _identifier: &sp_inherents::InherentIdentifier,
158 _error: &[u8],
159 ) -> Option<Result<(), sp_inherents::Error>> {
160 None
162 }
163}
164
165#[cfg(feature = "runtime-benchmarks")]
166pub type HostFunctions = (
167 frame_benchmarking::benchmarking::HostFunctions,
168 ParachainHostFunctions,
169 moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
170);
171#[cfg(not(feature = "runtime-benchmarks"))]
172pub type HostFunctions = (
173 ParachainHostFunctions,
174 moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
175);
176
177pub enum MoonbeamBlockImport<Client> {
178 Dev(FrontierBlockImport<Client>),
180 ParachainLookahead(FrontierBlockImport<Client>),
182 ParachainSlotBased(
184 SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>,
185 SlotBasedBlockImportHandle<Block>,
186 ),
187}
188
189impl<Client> MoonbeamBlockImport<Client>
190where
191 Client: Send + Sync + 'static,
192 Client: ProvideRuntimeApi<Block>,
193 Client::Api: sp_block_builder::BlockBuilder<Block> + fp_rpc::EthereumRuntimeRPCApi<Block>,
194 FrontierBlockImport<Client>: BlockImport<Block>,
195 SlotBasedBlockImport<Block, FrontierBlockImport<Client>, Client>: BlockImport<Block, Error = <FrontierBlockImport<Client> as BlockImport<Block>>::Error>
196 + 'static,
197{
198 fn new(
199 frontier_block_import: FrontierBlockImport<Client>,
200 client: Arc<Client>,
201 authoring_policy: AuthoringPolicy,
202 is_dev: bool,
203 ) -> Self {
204 if is_dev {
205 MoonbeamBlockImport::Dev(frontier_block_import)
206 } else {
207 match authoring_policy {
208 AuthoringPolicy::Lookahead => {
209 MoonbeamBlockImport::ParachainLookahead(frontier_block_import)
210 }
211 AuthoringPolicy::SlotBased => {
212 let (block_import, block_import_auxiliary_data) =
213 SlotBasedBlockImport::new(frontier_block_import, client);
214 MoonbeamBlockImport::ParachainSlotBased(
215 block_import,
216 block_import_auxiliary_data,
217 )
218 }
219 }
220 }
221 }
222}
223
224pub trait ClientCustomizations {
231 fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
238 None
239 }
240}
241
242#[cfg(feature = "moonbeam-native")]
243pub struct MoonbeamCustomizations;
244#[cfg(feature = "moonbeam-native")]
245impl ClientCustomizations for MoonbeamCustomizations {
246 fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
247 Some(2_000_000)
248 }
249}
250
251#[cfg(feature = "moonriver-native")]
252pub struct MoonriverCustomizations;
253#[cfg(feature = "moonriver-native")]
254impl ClientCustomizations for MoonriverCustomizations {
255 fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
256 Some(3_000_000)
257 }
258}
259
260#[cfg(feature = "moonbase-native")]
261pub struct MoonbaseCustomizations;
262#[cfg(feature = "moonbase-native")]
263impl ClientCustomizations for MoonbaseCustomizations {
264 fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
265 Some(3_000_000)
266 }
267}
268
269#[derive(Clone)]
271pub enum RuntimeVariant {
272 #[cfg(feature = "moonbeam-native")]
273 Moonbeam,
274 #[cfg(feature = "moonriver-native")]
275 Moonriver,
276 #[cfg(feature = "moonbase-native")]
277 Moonbase,
278 Unrecognized,
279}
280
281impl RuntimeVariant {
282 pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
283 match chain_spec {
284 #[cfg(feature = "moonbeam-native")]
285 spec if spec.is_moonbeam() => Self::Moonbeam,
286 #[cfg(feature = "moonriver-native")]
287 spec if spec.is_moonriver() => Self::Moonriver,
288 #[cfg(feature = "moonbase-native")]
289 spec if spec.is_moonbase() => Self::Moonbase,
290 _ => Self::Unrecognized,
291 }
292 }
293}
294
295pub trait IdentifyVariant {
298 fn is_moonbase(&self) -> bool;
300
301 fn is_moonbeam(&self) -> bool;
303
304 fn is_moonriver(&self) -> bool;
306
307 fn is_dev(&self) -> bool;
309}
310
311impl IdentifyVariant for Box<dyn ChainSpec> {
312 fn is_moonbase(&self) -> bool {
313 self.id().starts_with("moonbase")
314 }
315
316 fn is_moonbeam(&self) -> bool {
317 self.id().starts_with("moonbeam")
318 }
319
320 fn is_moonriver(&self) -> bool {
321 self.id().starts_with("moonriver")
322 }
323
324 fn is_dev(&self) -> bool {
325 self.chain_type() == sc_chain_spec::ChainType::Development
326 }
327}
328
329pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
330 config
331 .base_path
332 .config_dir(config.chain_spec.id())
333 .join("frontier")
334 .join(path)
335}
336
337pub fn open_frontier_backend<C, BE>(
340 client: Arc<C>,
341 config: &Configuration,
342 rpc_config: &RpcConfig,
343) -> Result<fc_db::Backend<Block, C>, String>
344where
345 C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
346 C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
347 C: Send + Sync + 'static,
348 C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
349 BE: Backend<Block> + 'static,
350 BE::State: StateBackend<BlakeTwo256>,
351{
352 let frontier_backend = match rpc_config.frontier_backend_config {
353 FrontierBackendConfig::KeyValue => {
354 fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
355 client,
356 &fc_db::kv::DatabaseSettings {
357 source: match config.database {
358 DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
359 path: frontier_database_dir(config, "db"),
360 cache_size: 0,
361 },
362 DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
363 path: frontier_database_dir(config, "paritydb"),
364 },
365 DatabaseSource::Auto { .. } => DatabaseSource::Auto {
366 rocksdb_path: frontier_database_dir(config, "db"),
367 paritydb_path: frontier_database_dir(config, "paritydb"),
368 cache_size: 0,
369 },
370 _ => {
371 return Err(
372 "Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
373 )
374 }
375 },
376 },
377 )?))
378 }
379 FrontierBackendConfig::Sql {
380 pool_size,
381 num_ops_timeout,
382 thread_count,
383 cache_size,
384 } => {
385 let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
386 let sqlite_db_path = frontier_database_dir(config, "sql");
387 std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
388 let backend = futures::executor::block_on(fc_db::sql::Backend::new(
389 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
390 path: Path::new("sqlite:///")
391 .join(sqlite_db_path)
392 .join("frontier.db3")
393 .to_str()
394 .expect("frontier sql path error"),
395 create_if_missing: true,
396 thread_count: thread_count,
397 cache_size: cache_size,
398 }),
399 pool_size,
400 std::num::NonZeroU32::new(num_ops_timeout),
401 overrides.clone(),
402 ))
403 .unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
404 fc_db::Backend::Sql(Arc::new(backend))
405 }
406 };
407
408 Ok(frontier_backend)
409}
410
411use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
412
413pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
414
415#[allow(clippy::type_complexity)]
417pub fn new_chain_ops(
418 config: &mut Configuration,
419 rpc_config: &RpcConfig,
420 node_extra_args: NodeExtraArgs,
421) -> Result<
422 (
423 Arc<Client>,
424 Arc<FullBackend>,
425 sc_consensus::BasicQueue<Block>,
426 TaskManager,
427 ),
428 ServiceError,
429> {
430 match &config.chain_spec {
431 #[cfg(feature = "moonriver-native")]
432 spec if spec.is_moonriver() => new_chain_ops_inner::<
433 moonriver_runtime::RuntimeApi,
434 MoonriverCustomizations,
435 >(config, rpc_config, node_extra_args),
436 #[cfg(feature = "moonbeam-native")]
437 spec if spec.is_moonbeam() => new_chain_ops_inner::<
438 moonbeam_runtime::RuntimeApi,
439 MoonbeamCustomizations,
440 >(config, rpc_config, node_extra_args),
441 #[cfg(feature = "moonbase-native")]
442 _ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
443 config,
444 rpc_config,
445 node_extra_args,
446 ),
447 #[cfg(not(feature = "moonbase-native"))]
448 _ => panic!("invalid chain spec"),
449 }
450}
451
452#[allow(clippy::type_complexity)]
453fn new_chain_ops_inner<RuntimeApi, Customizations>(
454 config: &mut Configuration,
455 rpc_config: &RpcConfig,
456 node_extra_args: NodeExtraArgs,
457) -> Result<
458 (
459 Arc<Client>,
460 Arc<FullBackend>,
461 sc_consensus::BasicQueue<Block>,
462 TaskManager,
463 ),
464 ServiceError,
465>
466where
467 Client: From<Arc<crate::FullClient<RuntimeApi>>>,
468 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
469 RuntimeApi::RuntimeApi: RuntimeApiCollection,
470 Customizations: ClientCustomizations + 'static,
471{
472 config.keystore = sc_service::config::KeystoreConfig::InMemory;
473 let PartialComponents {
474 client,
475 backend,
476 import_queue,
477 task_manager,
478 ..
479 } = new_partial::<RuntimeApi, Customizations>(config, rpc_config, node_extra_args)?;
480 Ok((
481 Arc::new(Client::from(client)),
482 backend,
483 import_queue,
484 task_manager,
485 ))
486}
487
488fn set_prometheus_registry(
490 config: &mut Configuration,
491 skip_prefix: bool,
492) -> Result<(), ServiceError> {
493 if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
494 let labels = hashmap! {
495 "chain".into() => config.chain_spec.id().into(),
496 };
497 let prefix = if skip_prefix {
498 None
499 } else {
500 Some("moonbeam".into())
501 };
502
503 *registry = Registry::new_custom(prefix, Some(labels))?;
504 }
505
506 Ok(())
507}
508
509#[allow(clippy::type_complexity)]
514pub fn new_partial<RuntimeApi, Customizations>(
515 config: &mut Configuration,
516 rpc_config: &RpcConfig,
517 node_extra_args: NodeExtraArgs,
518) -> PartialComponentsResult<FullClient<RuntimeApi>, FullBackend>
519where
520 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
521 RuntimeApi::RuntimeApi: RuntimeApiCollection,
522 Customizations: ClientCustomizations + 'static,
523{
524 set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
525
526 config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
528
529 let telemetry = config
530 .telemetry_endpoints
531 .clone()
532 .filter(|x| !x.is_empty())
533 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
534 let worker = TelemetryWorker::new(16)?;
535 let telemetry = worker.handle().new_telemetry(endpoints);
536 Ok((worker, telemetry))
537 })
538 .transpose()?;
539
540 let heap_pages = config
541 .executor
542 .default_heap_pages
543 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
544 extra_pages: h as _,
545 });
546 let mut wasm_builder = WasmExecutor::builder()
547 .with_execution_method(config.executor.wasm_method)
548 .with_onchain_heap_alloc_strategy(heap_pages)
549 .with_offchain_heap_alloc_strategy(heap_pages)
550 .with_ignore_onchain_heap_pages(true)
551 .with_max_runtime_instances(config.executor.max_runtime_instances)
552 .with_runtime_cache_size(config.executor.runtime_cache_size);
553
554 if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
555 wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
556 }
557
558 let executor = wasm_builder.build();
559
560 let (client, backend, keystore_container, task_manager) =
561 sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
562 config,
563 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
564 executor,
565 true,
566 )?;
567
568 if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
569 client
570 .execution_extensions()
571 .set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
572 Block,
573 sp_io::UseDalekExt,
574 >::new(block_number));
575 }
576
577 let client = Arc::new(client);
578
579 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
580
581 let telemetry = telemetry.map(|(worker, telemetry)| {
582 task_manager
583 .spawn_handle()
584 .spawn("telemetry", None, worker.run());
585 telemetry
586 });
587
588 let maybe_select_chain = if config.chain_spec.is_dev() {
589 Some(sc_consensus::LongestChain::new(backend.clone()))
590 } else {
591 None
592 };
593
594 let transaction_pool = sc_transaction_pool::Builder::new(
595 task_manager.spawn_essential_handle(),
596 client.clone(),
597 config.role.is_authority().into(),
598 )
599 .with_options(config.transaction_pool.clone())
600 .with_prometheus(config.prometheus_registry())
601 .build();
602
603 let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
604 let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
605
606 let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
607 let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
608 let block_import = MoonbeamBlockImport::new(
609 frontier_block_import.clone(),
610 client.clone(),
611 node_extra_args.authoring_policy,
612 config.chain_spec.is_dev(),
613 );
614
615 let create_inherent_data_providers = move |_, _| async move {
616 let time = sp_timestamp::InherentDataProvider::from_system_time();
617 Ok((time,))
618 };
619
620 let import_queue = {
621 let block_import_for_queue: Box<
622 dyn sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync,
623 > = match &block_import {
624 MoonbeamBlockImport::Dev(bi) => Box::new(bi.clone()),
625 MoonbeamBlockImport::ParachainLookahead(bi) => {
626 if node_extra_args.legacy_block_import_strategy {
627 Box::new(TParachainBlockImport::new_with_delayed_best_block(
628 bi.clone(),
629 backend.clone(),
630 ))
631 } else {
632 Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
633 }
634 }
635 MoonbeamBlockImport::ParachainSlotBased(bi, _handle) => {
636 if node_extra_args.legacy_block_import_strategy {
637 Box::new(TParachainBlockImport::new_with_delayed_best_block(
638 bi.clone(),
639 backend.clone(),
640 ))
641 } else {
642 Box::new(TParachainBlockImport::new(bi.clone(), backend.clone()))
643 }
644 }
645 };
646
647 nimbus_consensus::import_queue(
648 client.clone(),
649 block_import_for_queue,
650 create_inherent_data_providers,
651 &task_manager.spawn_essential_handle(),
652 config.prometheus_registry(),
653 node_extra_args.legacy_block_import_strategy,
654 false,
655 )?
656 };
657
658 Ok(PartialComponents {
659 backend,
660 client,
661 import_queue,
662 keystore_container,
663 task_manager,
664 transaction_pool: transaction_pool.into(),
665 select_chain: maybe_select_chain,
666 other: (
667 block_import,
668 filter_pool,
669 telemetry,
670 telemetry_worker_handle,
671 frontier_backend,
672 fee_history_cache,
673 ),
674 })
675}
676
677async fn build_relay_chain_interface(
678 polkadot_config: Configuration,
679 parachain_config: &Configuration,
680 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
681 task_manager: &mut TaskManager,
682 collator_options: CollatorOptions,
683 hwbench: Option<sc_sysinfo::HwBench>,
684) -> RelayChainResult<(
685 Arc<(dyn RelayChainInterface + 'static)>,
686 Option<CollatorPair>,
687)> {
688 let result = if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
689 collator_options.relay_chain_mode
690 {
691 build_minimal_relay_chain_node_with_rpc(
692 polkadot_config,
693 parachain_config.prometheus_registry(),
694 task_manager,
695 rpc_target_urls,
696 )
697 .await
698 } else {
699 build_inprocess_relay_chain(
700 polkadot_config,
701 parachain_config,
702 telemetry_worker_handle,
703 task_manager,
704 hwbench,
705 )
706 };
707
708 result
710 .map(|(relay_chain_interface, collator_pair, _, _)| (relay_chain_interface, collator_pair))
711}
712
713#[sc_tracing::logging::prefix_logs_with("🌗")]
717async fn start_node_impl<RuntimeApi, Customizations, Net>(
718 parachain_config: Configuration,
719 polkadot_config: Configuration,
720 collator_options: CollatorOptions,
721 para_id: ParaId,
722 rpc_config: RpcConfig,
723 block_authoring_duration: Duration,
724 hwbench: Option<sc_sysinfo::HwBench>,
725 node_extra_args: NodeExtraArgs,
726) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
727where
728 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
729 RuntimeApi::RuntimeApi: RuntimeApiCollection
730 + cumulus_primitives_core::GetCoreSelectorApi<Block>
731 + cumulus_primitives_core::RelayParentOffsetApi<Block>,
732 Customizations: ClientCustomizations + 'static,
733 Net: NetworkBackend<Block, Hash>,
734{
735 let mut parachain_config = prepare_node_config(parachain_config);
736
737 let params = new_partial::<RuntimeApi, Customizations>(
738 &mut parachain_config,
739 &rpc_config,
740 node_extra_args.clone(),
741 )?;
742 let (
743 block_import,
744 filter_pool,
745 mut telemetry,
746 telemetry_worker_handle,
747 frontier_backend,
748 fee_history_cache,
749 ) = params.other;
750
751 let client = params.client.clone();
752 let backend = params.backend.clone();
753 let mut task_manager = params.task_manager;
754
755 let (relay_chain_interface, collator_key) = build_relay_chain_interface(
756 polkadot_config,
757 ¶chain_config,
758 telemetry_worker_handle,
759 &mut task_manager,
760 collator_options.clone(),
761 hwbench.clone(),
762 )
763 .await
764 .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
765
766 let force_authoring = parachain_config.force_authoring;
767 let collator = parachain_config.role.is_authority();
768 let prometheus_registry = parachain_config.prometheus_registry().cloned();
769 let transaction_pool = params.transaction_pool.clone();
770 let import_queue_service = params.import_queue.service();
771 let net_config = FullNetworkConfiguration::<_, _, Net>::new(
772 ¶chain_config.network,
773 prometheus_registry.clone(),
774 );
775
776 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
777 cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
778 parachain_config: ¶chain_config,
779 client: client.clone(),
780 transaction_pool: transaction_pool.clone(),
781 spawn_handle: task_manager.spawn_handle(),
782 import_queue: params.import_queue,
783 para_id: para_id.clone(),
784 relay_chain_interface: relay_chain_interface.clone(),
785 net_config,
786 sybil_resistance_level: CollatorSybilResistance::Resistant,
787 metrics: Net::register_notification_metrics(
788 parachain_config
789 .prometheus_config
790 .as_ref()
791 .map(|config| &config.registry),
792 ),
793 })
794 .await?;
795
796 let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
797 let fee_history_limit = rpc_config.fee_history_limit;
798
799 let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
806 fc_mapping_sync::EthereumBlockNotification<Block>,
807 > = Default::default();
808 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
809
810 rpc::spawn_essential_tasks(
811 rpc::SpawnTasksParams {
812 task_manager: &task_manager,
813 client: client.clone(),
814 substrate_backend: backend.clone(),
815 frontier_backend: frontier_backend.clone(),
816 filter_pool: filter_pool.clone(),
817 overrides: overrides.clone(),
818 fee_history_limit,
819 fee_history_cache: fee_history_cache.clone(),
820 },
821 sync_service.clone(),
822 pubsub_notification_sinks.clone(),
823 );
824
825 let ethapi_cmd = rpc_config.ethapi.clone();
826 let tracing_requesters =
827 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
828 rpc::tracing::spawn_tracing_tasks(
829 &rpc_config,
830 prometheus_registry.clone(),
831 rpc::SpawnTasksParams {
832 task_manager: &task_manager,
833 client: client.clone(),
834 substrate_backend: backend.clone(),
835 frontier_backend: frontier_backend.clone(),
836 filter_pool: filter_pool.clone(),
837 overrides: overrides.clone(),
838 fee_history_limit,
839 fee_history_cache: fee_history_cache.clone(),
840 },
841 )
842 } else {
843 rpc::tracing::RpcRequesters {
844 debug: None,
845 trace: None,
846 }
847 };
848
849 let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
850 task_manager.spawn_handle(),
851 overrides.clone(),
852 rpc_config.eth_log_block_cache,
853 rpc_config.eth_statuses_cache,
854 prometheus_registry.clone(),
855 ));
856
857 let rpc_builder = {
858 let client = client.clone();
859 let pool = transaction_pool.clone();
860 let network = network.clone();
861 let sync = sync_service.clone();
862 let filter_pool = filter_pool.clone();
863 let frontier_backend = frontier_backend.clone();
864 let backend = backend.clone();
865 let ethapi_cmd = ethapi_cmd.clone();
866 let max_past_logs = rpc_config.max_past_logs;
867 let max_block_range = rpc_config.max_block_range;
868 let overrides = overrides.clone();
869 let fee_history_cache = fee_history_cache.clone();
870 let block_data_cache = block_data_cache.clone();
871 let pubsub_notification_sinks = pubsub_notification_sinks.clone();
872
873 let keystore = params.keystore_container.keystore();
874 move |subscription_task_executor| {
875 #[cfg(feature = "moonbase-native")]
876 let forced_parent_hashes = {
877 let mut forced_parent_hashes = BTreeMap::new();
878 forced_parent_hashes.insert(
881 H256::from_str(
882 "0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
883 )
884 .expect("must be valid hash"),
885 H256::from_str(
886 "0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
887 )
888 .expect("must be valid hash"),
889 );
890 Some(forced_parent_hashes)
891 };
892 #[cfg(not(feature = "moonbase-native"))]
893 let forced_parent_hashes = None;
894
895 let deps = rpc::FullDeps {
896 backend: backend.clone(),
897 client: client.clone(),
898 command_sink: None,
899 ethapi_cmd: ethapi_cmd.clone(),
900 filter_pool: filter_pool.clone(),
901 frontier_backend: match &*frontier_backend {
902 fc_db::Backend::KeyValue(b) => b.clone(),
903 fc_db::Backend::Sql(b) => b.clone(),
904 },
905 graph: pool.clone(),
906 pool: pool.clone(),
907 is_authority: collator,
908 max_past_logs,
909 max_block_range,
910 fee_history_limit,
911 fee_history_cache: fee_history_cache.clone(),
912 network: network.clone(),
913 sync: sync.clone(),
914 dev_rpc_data: None,
915 block_data_cache: block_data_cache.clone(),
916 overrides: overrides.clone(),
917 forced_parent_hashes,
918 };
919 let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
920 client.clone(),
921 keystore.clone(),
922 ));
923 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
924 rpc::create_full(
925 deps,
926 subscription_task_executor,
927 Some(crate::rpc::TracingConfig {
928 tracing_requesters: tracing_requesters.clone(),
929 trace_filter_max_count: rpc_config.ethapi_trace_max_count,
930 }),
931 pubsub_notification_sinks.clone(),
932 pending_consensus_data_provider,
933 para_id,
934 )
935 .map_err(Into::into)
936 } else {
937 rpc::create_full(
938 deps,
939 subscription_task_executor,
940 None,
941 pubsub_notification_sinks.clone(),
942 pending_consensus_data_provider,
943 para_id,
944 )
945 .map_err(Into::into)
946 }
947 }
948 };
949
950 sc_service::spawn_tasks(sc_service::SpawnTasksParams {
951 rpc_builder: Box::new(rpc_builder),
952 client: client.clone(),
953 transaction_pool: transaction_pool.clone(),
954 task_manager: &mut task_manager,
955 config: parachain_config,
956 keystore: params.keystore_container.keystore(),
957 backend: backend.clone(),
958 network: network.clone(),
959 sync_service: sync_service.clone(),
960 system_rpc_tx,
961 tx_handler_controller,
962 telemetry: telemetry.as_mut(),
963 })?;
964
965 if let Some(hwbench) = hwbench {
966 sc_sysinfo::print_hwbench(&hwbench);
967
968 if let Some(ref mut telemetry) = telemetry {
969 let telemetry_handle = telemetry.handle();
970 task_manager.spawn_handle().spawn(
971 "telemetry_hwbench",
972 None,
973 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
974 );
975 }
976 }
977
978 let announce_block = {
979 let sync_service = sync_service.clone();
980 Arc::new(move |hash, data| sync_service.announce_block(hash, data))
981 };
982
983 let relay_chain_slot_duration = Duration::from_secs(6);
984 let overseer_handle = relay_chain_interface
985 .overseer_handle()
986 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
987
988 start_relay_chain_tasks(StartRelayChainTasksParams {
989 client: client.clone(),
990 announce_block: announce_block.clone(),
991 para_id,
992 relay_chain_interface: relay_chain_interface.clone(),
993 task_manager: &mut task_manager,
994 da_recovery_profile: if collator {
995 DARecoveryProfile::Collator
996 } else {
997 DARecoveryProfile::FullNode
998 },
999 import_queue: import_queue_service,
1000 relay_chain_slot_duration,
1001 recovery_handle: Box::new(overseer_handle.clone()),
1002 sync_service: sync_service.clone(),
1003 prometheus_registry: prometheus_registry.as_ref(),
1004 })?;
1005
1006 if matches!(block_import, MoonbeamBlockImport::Dev(_)) {
1007 return Err(sc_service::Error::Other(
1008 "Block import pipeline is not for parachain".into(),
1009 ));
1010 }
1011
1012 if collator {
1013 start_consensus::<RuntimeApi, _>(
1014 backend.clone(),
1015 client.clone(),
1016 block_import,
1017 prometheus_registry.as_ref(),
1018 telemetry.as_ref().map(|t| t.handle()),
1019 &task_manager,
1020 relay_chain_interface.clone(),
1021 transaction_pool,
1022 params.keystore_container.keystore(),
1023 para_id,
1024 collator_key.expect("Command line arguments do not allow this. qed"),
1025 overseer_handle,
1026 announce_block,
1027 force_authoring,
1028 relay_chain_slot_duration,
1029 block_authoring_duration,
1030 sync_service.clone(),
1031 node_extra_args,
1032 )?;
1033 }
1034
1035 Ok((task_manager, client))
1036}
1037
1038fn start_consensus<RuntimeApi, SO>(
1039 backend: Arc<FullBackend>,
1040 client: Arc<FullClient<RuntimeApi>>,
1041 block_import: MoonbeamBlockImport<FullClient<RuntimeApi>>,
1042 prometheus_registry: Option<&Registry>,
1043 telemetry: Option<TelemetryHandle>,
1044 task_manager: &TaskManager,
1045 relay_chain_interface: Arc<dyn RelayChainInterface>,
1046 transaction_pool: Arc<
1047 sc_transaction_pool::TransactionPoolHandle<Block, FullClient<RuntimeApi>>,
1048 >,
1049 keystore: KeystorePtr,
1050 para_id: ParaId,
1051 collator_key: CollatorPair,
1052 overseer_handle: OverseerHandle,
1053 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
1054 force_authoring: bool,
1055 relay_chain_slot_duration: Duration,
1056 block_authoring_duration: Duration,
1057 sync_oracle: SO,
1058 node_extra_args: NodeExtraArgs,
1059) -> Result<(), sc_service::Error>
1060where
1061 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1062 RuntimeApi::RuntimeApi: RuntimeApiCollection
1063 + cumulus_primitives_core::GetCoreSelectorApi<Block>
1064 + cumulus_primitives_core::RelayParentOffsetApi<Block>,
1065 sc_client_api::StateBackendFor<FullBackend, Block>: sc_client_api::StateBackend<BlakeTwo256>,
1066 SO: SyncOracle + Send + Sync + Clone + 'static,
1067{
1068 let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
1069 task_manager.spawn_handle(),
1070 client.clone(),
1071 transaction_pool,
1072 prometheus_registry,
1073 telemetry.clone(),
1074 );
1075
1076 let proposer = Proposer::new(proposer_factory);
1077 let collator_service = CollatorService::new(
1078 client.clone(),
1079 Arc::new(task_manager.spawn_handle()),
1080 announce_block,
1081 client.clone(),
1082 );
1083
1084 fn create_inherent_data_providers<A, B>(
1085 _: A,
1086 _: B,
1087 ) -> impl futures::Future<
1088 Output = Result<
1089 (
1090 nimbus_primitives::InherentDataProvider,
1091 session_keys_primitives::InherentDataProvider,
1092 ),
1093 Box<dyn std::error::Error + Send + Sync>,
1094 >,
1095 > {
1096 async move {
1097 let author = nimbus_primitives::InherentDataProvider;
1098 let randomness = session_keys_primitives::InherentDataProvider;
1099 Ok((author, randomness))
1100 }
1101 }
1102
1103 let client_clone = client.clone();
1104 let keystore_clone = keystore.clone();
1105 let maybe_provide_vrf_digest =
1106 move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
1107 moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
1108 &client_clone,
1109 &keystore_clone,
1110 nimbus_id,
1111 parent,
1112 )
1113 };
1114
1115 log::info!("Collator started with asynchronous backing.");
1116 let client_clone = client.clone();
1117 let code_hash_provider = move |block_hash| {
1118 client_clone
1119 .code_at(block_hash)
1120 .ok()
1121 .map(polkadot_primitives::ValidationCode)
1122 .map(|c| c.hash())
1123 };
1124
1125 match block_import {
1126 MoonbeamBlockImport::ParachainLookahead(bi) => {
1127 let block_import = if node_extra_args.legacy_block_import_strategy {
1128 TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
1129 } else {
1130 TParachainBlockImport::new(bi, backend.clone())
1131 };
1132
1133 let params = nimbus_consensus::collators::lookahead::Params {
1134 additional_digests_provider: maybe_provide_vrf_digest,
1135 additional_relay_keys: vec![relay_chain::well_known_keys::EPOCH_INDEX.to_vec()],
1136 authoring_duration: block_authoring_duration,
1137 block_import,
1138 code_hash_provider,
1139 collator_key,
1140 collator_service,
1141 create_inherent_data_providers,
1142 force_authoring,
1143 keystore,
1144 overseer_handle,
1145 para_backend: backend,
1146 para_client: client,
1147 para_id,
1148 proposer,
1149 relay_chain_slot_duration,
1150 relay_client: relay_chain_interface,
1151 slot_duration: None,
1152 sync_oracle,
1153 reinitialize: false,
1154 max_pov_percentage: node_extra_args.max_pov_percentage,
1155 };
1156
1157 task_manager.spawn_essential_handle().spawn(
1158 "nimbus",
1159 None,
1160 nimbus_consensus::collators::lookahead::run::<
1161 Block,
1162 _,
1163 _,
1164 _,
1165 FullBackend,
1166 _,
1167 _,
1168 _,
1169 _,
1170 _,
1171 _,
1172 >(params),
1173 );
1174 }
1175 MoonbeamBlockImport::ParachainSlotBased(bi, handle) => {
1176 let block_import = if node_extra_args.legacy_block_import_strategy {
1177 TParachainBlockImport::new_with_delayed_best_block(bi, backend.clone())
1178 } else {
1179 TParachainBlockImport::new(bi, backend.clone())
1180 };
1181
1182 nimbus_consensus::collators::slot_based::run::<
1183 Block,
1184 nimbus_primitives::NimbusPair,
1185 _,
1186 _,
1187 _,
1188 FullBackend,
1189 _,
1190 _,
1191 _,
1192 _,
1193 _,
1194 _,
1195 >(nimbus_consensus::collators::slot_based::Params {
1196 additional_digests_provider: maybe_provide_vrf_digest,
1197 additional_relay_state_keys: vec![
1198 relay_chain::well_known_keys::EPOCH_INDEX.to_vec()
1199 ],
1200 authoring_duration: block_authoring_duration,
1201 block_import,
1202 code_hash_provider,
1203 collator_key,
1204 collator_service,
1205 create_inherent_data_providers: move |b, a| async move {
1206 create_inherent_data_providers(b, a).await
1207 },
1208 force_authoring,
1209 keystore,
1210 para_backend: backend,
1211 para_client: client,
1212 para_id,
1213 proposer,
1214 relay_chain_slot_duration,
1215 relay_client: relay_chain_interface,
1216 para_slot_duration: None,
1217 reinitialize: false,
1218 max_pov_percentage: node_extra_args.max_pov_percentage.map(|p| p as u32),
1219 export_pov: node_extra_args.export_pov,
1220 slot_offset: Duration::from_secs(1),
1221 spawner: task_manager.spawn_essential_handle(),
1222 block_import_handle: handle,
1223 });
1224 }
1225 MoonbeamBlockImport::Dev(_) => {
1226 return Err(sc_service::Error::Other(
1227 "Dev block import should not be used in parachain consensus".into(),
1228 ))
1229 }
1230 }
1231
1232 Ok(())
1233}
1234
1235#[rustfmt::skip]
1238pub async fn start_node<RuntimeApi, Customizations>(
1239 parachain_config: Configuration,
1240 polkadot_config: Configuration,
1241 collator_options: CollatorOptions,
1242 para_id: ParaId,
1243 rpc_config: RpcConfig,
1244 block_authoring_duration: Duration,
1245 hwbench: Option<sc_sysinfo::HwBench>,
1246 node_extra_args: NodeExtraArgs,
1247) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
1248where
1249 RuntimeApi:
1250 ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1251 RuntimeApi::RuntimeApi:
1252 RuntimeApiCollection + cumulus_primitives_core::GetCoreSelectorApi<Block>
1253 + cumulus_primitives_core::RelayParentOffsetApi<Block>,
1254 Customizations: ClientCustomizations + 'static,
1255{
1256 start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
1257 parachain_config,
1258 polkadot_config,
1259 collator_options,
1260 para_id,
1261 rpc_config,
1262 block_authoring_duration,
1263 hwbench,
1264 node_extra_args
1265 )
1266 .await
1267}
1268
1269pub async fn new_dev<RuntimeApi, Customizations, Net>(
1272 mut config: Configuration,
1273 para_id: Option<u32>,
1274 _author_id: Option<NimbusId>,
1275 sealing: moonbeam_cli_opt::Sealing,
1276 rpc_config: RpcConfig,
1277 hwbench: Option<sc_sysinfo::HwBench>,
1278 node_extra_args: NodeExtraArgs,
1279) -> Result<TaskManager, ServiceError>
1280where
1281 RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
1282 RuntimeApi::RuntimeApi: RuntimeApiCollection,
1283 Customizations: ClientCustomizations + 'static,
1284 Net: NetworkBackend<Block, Hash>,
1285{
1286 use async_io::Timer;
1287 use futures::Stream;
1288 use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
1289
1290 let sc_service::PartialComponents {
1291 client,
1292 backend,
1293 mut task_manager,
1294 import_queue,
1295 keystore_container,
1296 select_chain: maybe_select_chain,
1297 transaction_pool,
1298 other:
1299 (
1300 block_import_pipeline,
1301 filter_pool,
1302 mut telemetry,
1303 _telemetry_worker_handle,
1304 frontier_backend,
1305 fee_history_cache,
1306 ),
1307 } = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, node_extra_args)?;
1308
1309 let block_import = if let MoonbeamBlockImport::Dev(block_import) = block_import_pipeline {
1310 block_import
1311 } else {
1312 return Err(ServiceError::Other(
1313 "Block import pipeline is not dev".to_string(),
1314 ));
1315 };
1316
1317 let prometheus_registry = config.prometheus_registry().cloned();
1318 let net_config =
1319 FullNetworkConfiguration::<_, _, Net>::new(&config.network, prometheus_registry.clone());
1320
1321 let metrics = Net::register_notification_metrics(
1322 config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
1323 );
1324
1325 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
1326 sc_service::build_network(sc_service::BuildNetworkParams {
1327 config: &config,
1328 client: client.clone(),
1329 transaction_pool: transaction_pool.clone(),
1330 spawn_handle: task_manager.spawn_handle(),
1331 import_queue,
1332 block_announce_validator_builder: None,
1333 warp_sync_config: None,
1334 net_config,
1335 block_relay: None,
1336 metrics,
1337 })?;
1338
1339 if config.offchain_worker.enabled {
1340 task_manager.spawn_handle().spawn(
1341 "offchain-workers-runner",
1342 "offchain-work",
1343 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
1344 runtime_api_provider: client.clone(),
1345 keystore: Some(keystore_container.keystore()),
1346 offchain_db: backend.offchain_storage(),
1347 transaction_pool: Some(OffchainTransactionPoolFactory::new(
1348 transaction_pool.clone(),
1349 )),
1350 network_provider: Arc::new(network.clone()),
1351 is_validator: config.role.is_authority(),
1352 enable_http_requests: true,
1353 custom_extensions: move |_| vec![],
1354 })?
1355 .run(client.clone(), task_manager.spawn_handle())
1356 .boxed(),
1357 );
1358 }
1359
1360 let prometheus_registry = config.prometheus_registry().cloned();
1361 let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
1362 let fee_history_limit = rpc_config.fee_history_limit;
1363 let mut command_sink = None;
1364 let mut dev_rpc_data = None;
1365 let collator = config.role.is_authority();
1366
1367 let parachain_id: ParaId = para_id
1368 .expect("para ID should be specified for dev service")
1369 .into();
1370
1371 if collator {
1372 let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
1373 task_manager.spawn_handle(),
1374 client.clone(),
1375 transaction_pool.clone(),
1376 prometheus_registry.as_ref(),
1377 telemetry.as_ref().map(|x| x.handle()),
1378 );
1379 env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
1380
1381 let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
1382 match sealing {
1383 moonbeam_cli_opt::Sealing::Instant => {
1384 Box::new(
1385 transaction_pool.import_notification_stream().map(|_| {
1387 EngineCommand::SealNewBlock {
1388 create_empty: false,
1389 finalize: false,
1390 parent_hash: None,
1391 sender: None,
1392 }
1393 }),
1394 )
1395 }
1396 moonbeam_cli_opt::Sealing::Manual => {
1397 let (sink, stream) = futures::channel::mpsc::channel(1000);
1398 command_sink = Some(sink);
1400 Box::new(stream)
1401 }
1402 moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
1403 Timer::interval(Duration::from_millis(millis)),
1404 |_| EngineCommand::SealNewBlock {
1405 create_empty: true,
1406 finalize: false,
1407 parent_hash: None,
1408 sender: None,
1409 },
1410 )),
1411 };
1412
1413 let select_chain = maybe_select_chain.expect(
1414 "`new_partial` builds a `LongestChainRule` when building dev service.\
1415 We specified the dev service when calling `new_partial`.\
1416 Therefore, a `LongestChainRule` is present. qed.",
1417 );
1418
1419 let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
1421 let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
1422 let additional_relay_offset = Arc::new(std::sync::atomic::AtomicU32::new(0));
1423 dev_rpc_data = Some((
1424 downward_xcm_sender,
1425 hrmp_xcm_sender,
1426 additional_relay_offset.clone(),
1427 ));
1428
1429 let client_vrf = client.clone();
1432
1433 let keystore_clone = keystore_container.keystore().clone();
1434 let maybe_provide_vrf_digest =
1435 move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
1436 moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
1437 &client_vrf,
1438 &keystore_clone,
1439 nimbus_id,
1440 parent,
1441 )
1442 };
1443
1444 let client_for_cidp = client.clone();
1447
1448 task_manager.spawn_essential_handle().spawn_blocking(
1449 "authorship_task",
1450 Some("block-authoring"),
1451 run_manual_seal(ManualSealParams {
1452 block_import,
1453 env,
1454 client: client.clone(),
1455 pool: transaction_pool.clone(),
1456 commands_stream,
1457 select_chain,
1458 consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
1459 keystore: keystore_container.keystore(),
1460 client: client.clone(),
1461 additional_digests_provider: maybe_provide_vrf_digest,
1462 _phantom: Default::default(),
1463 })),
1464 create_inherent_data_providers: move |block: H256, ()| {
1465 let maybe_current_para_block = client_for_cidp.number(block);
1466 let maybe_current_para_head = client_for_cidp.expect_header(block);
1467 let downward_xcm_receiver = downward_xcm_receiver.clone();
1468 let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
1469 let additional_relay_offset = additional_relay_offset.clone();
1470 let relay_slot_key = well_known_keys::CURRENT_SLOT.to_vec();
1471
1472 let client_for_xcm = client_for_cidp.clone();
1475
1476 async move {
1477 MockTimestampInherentDataProvider::advance_timestamp(
1478 RELAY_CHAIN_SLOT_DURATION_MILLIS,
1479 );
1480
1481 let current_para_block = maybe_current_para_block?
1482 .ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
1483
1484 let current_para_block_head = Some(polkadot_primitives::HeadData(
1485 maybe_current_para_head?.encode(),
1486 ));
1487
1488 let timestamp = TIMESTAMP.load(Ordering::SeqCst);
1490 let slot = timestamp.saturating_div(RELAY_CHAIN_SLOT_DURATION_MILLIS);
1492
1493 let additional_key_values = vec![
1494 (relay_slot_key, Slot::from(slot).encode()),
1495 (
1496 relay_chain::well_known_keys::ACTIVE_CONFIG.to_vec(),
1497 AbridgedHostConfiguration {
1498 max_code_size: 3_145_728,
1499 max_head_data_size: 20_480,
1500 max_upward_queue_count: 174_762,
1501 max_upward_queue_size: 1_048_576,
1502 max_upward_message_size: 65_531,
1503 max_upward_message_num_per_candidate: 16,
1504 hrmp_max_message_num_per_candidate: 10,
1505 validation_upgrade_cooldown: 6,
1506 validation_upgrade_delay: 6,
1507 async_backing_params: AsyncBackingParams {
1508 max_candidate_depth: 3,
1509 allowed_ancestry_len: 2,
1510 },
1511 }
1512 .encode(),
1513 ),
1514 ];
1515
1516 let current_para_head = client_for_xcm
1517 .header(block)
1518 .expect("Header lookup should succeed")
1519 .expect("Header passed in as parent should be present in backend.");
1520
1521 let should_send_go_ahead = match client_for_xcm
1522 .runtime_api()
1523 .collect_collation_info(block, ¤t_para_head)
1524 {
1525 Ok(info) => info.new_validation_code.is_some(),
1526 Err(e) => {
1527 log::error!("Failed to collect collation info: {:?}", e);
1528
1529 false
1530 }
1531 };
1532
1533 let mocked_parachain = MockValidationDataInherentDataProvider {
1534 current_para_block,
1535 para_id: parachain_id,
1536 upgrade_go_ahead: should_send_go_ahead.then(|| {
1537 log::info!(
1538 "Detected pending validation code, sending go-ahead signal."
1539 );
1540
1541 UpgradeGoAhead::GoAhead
1542 }),
1543 current_para_block_head,
1544 relay_offset: additional_relay_offset.load(Ordering::SeqCst),
1545 relay_blocks_per_para_block: 1,
1546 para_blocks_per_relay_epoch: 10,
1547 relay_randomness_config: (),
1548 xcm_config: MockXcmConfig::new(
1549 &*client_for_xcm,
1550 block,
1551 Default::default(),
1552 ),
1553 raw_downward_messages: downward_xcm_receiver.drain().collect(),
1554 raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
1555 additional_key_values: Some(additional_key_values),
1556 };
1557
1558 let randomness = session_keys_primitives::InherentDataProvider;
1559
1560 Ok((
1561 MockTimestampInherentDataProvider,
1562 mocked_parachain,
1563 randomness,
1564 ))
1565 }
1566 },
1567 }),
1568 );
1569 }
1570
1571 let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
1578 fc_mapping_sync::EthereumBlockNotification<Block>,
1579 > = Default::default();
1580 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1581
1582 rpc::spawn_essential_tasks(
1583 rpc::SpawnTasksParams {
1584 task_manager: &task_manager,
1585 client: client.clone(),
1586 substrate_backend: backend.clone(),
1587 frontier_backend: frontier_backend.clone(),
1588 filter_pool: filter_pool.clone(),
1589 overrides: overrides.clone(),
1590 fee_history_limit,
1591 fee_history_cache: fee_history_cache.clone(),
1592 },
1593 sync_service.clone(),
1594 pubsub_notification_sinks.clone(),
1595 );
1596 let ethapi_cmd = rpc_config.ethapi.clone();
1597 let tracing_requesters =
1598 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
1599 rpc::tracing::spawn_tracing_tasks(
1600 &rpc_config,
1601 prometheus_registry.clone(),
1602 rpc::SpawnTasksParams {
1603 task_manager: &task_manager,
1604 client: client.clone(),
1605 substrate_backend: backend.clone(),
1606 frontier_backend: frontier_backend.clone(),
1607 filter_pool: filter_pool.clone(),
1608 overrides: overrides.clone(),
1609 fee_history_limit,
1610 fee_history_cache: fee_history_cache.clone(),
1611 },
1612 )
1613 } else {
1614 rpc::tracing::RpcRequesters {
1615 debug: None,
1616 trace: None,
1617 }
1618 };
1619
1620 let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
1621 task_manager.spawn_handle(),
1622 overrides.clone(),
1623 rpc_config.eth_log_block_cache,
1624 rpc_config.eth_statuses_cache,
1625 prometheus_registry,
1626 ));
1627
1628 let rpc_builder = {
1629 let client = client.clone();
1630 let pool = transaction_pool.clone();
1631 let backend = backend.clone();
1632 let network = network.clone();
1633 let sync = sync_service.clone();
1634 let ethapi_cmd = ethapi_cmd.clone();
1635 let max_past_logs = rpc_config.max_past_logs;
1636 let max_block_range = rpc_config.max_block_range;
1637 let overrides = overrides.clone();
1638 let fee_history_cache = fee_history_cache.clone();
1639 let block_data_cache = block_data_cache.clone();
1640 let pubsub_notification_sinks = pubsub_notification_sinks.clone();
1641
1642 let keystore = keystore_container.keystore();
1643 move |subscription_task_executor| {
1644 let deps = rpc::FullDeps {
1645 backend: backend.clone(),
1646 client: client.clone(),
1647 command_sink: command_sink.clone(),
1648 ethapi_cmd: ethapi_cmd.clone(),
1649 filter_pool: filter_pool.clone(),
1650 frontier_backend: match &*frontier_backend {
1651 fc_db::Backend::KeyValue(b) => b.clone(),
1652 fc_db::Backend::Sql(b) => b.clone(),
1653 },
1654 graph: pool.clone(),
1655 pool: pool.clone(),
1656 is_authority: collator,
1657 max_past_logs,
1658 max_block_range,
1659 fee_history_limit,
1660 fee_history_cache: fee_history_cache.clone(),
1661 network: network.clone(),
1662 sync: sync.clone(),
1663 dev_rpc_data: dev_rpc_data.clone(),
1664 overrides: overrides.clone(),
1665 block_data_cache: block_data_cache.clone(),
1666 forced_parent_hashes: None,
1667 };
1668
1669 let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1670 client.clone(),
1671 keystore.clone(),
1672 ));
1673 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
1674 rpc::create_full(
1675 deps,
1676 subscription_task_executor,
1677 Some(crate::rpc::TracingConfig {
1678 tracing_requesters: tracing_requesters.clone(),
1679 trace_filter_max_count: rpc_config.ethapi_trace_max_count,
1680 }),
1681 pubsub_notification_sinks.clone(),
1682 pending_consensus_data_provider,
1683 parachain_id,
1684 )
1685 .map_err(Into::into)
1686 } else {
1687 rpc::create_full(
1688 deps,
1689 subscription_task_executor,
1690 None,
1691 pubsub_notification_sinks.clone(),
1692 pending_consensus_data_provider,
1693 parachain_id,
1694 )
1695 .map_err(Into::into)
1696 }
1697 }
1698 };
1699
1700 let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
1701 network,
1702 client,
1703 keystore: keystore_container.keystore(),
1704 task_manager: &mut task_manager,
1705 transaction_pool,
1706 rpc_builder: Box::new(rpc_builder),
1707 backend,
1708 system_rpc_tx,
1709 sync_service: sync_service.clone(),
1710 config,
1711 tx_handler_controller,
1712 telemetry: None,
1713 })?;
1714
1715 if let Some(hwbench) = hwbench {
1716 sc_sysinfo::print_hwbench(&hwbench);
1717
1718 if let Some(ref mut telemetry) = telemetry {
1719 let telemetry_handle = telemetry.handle();
1720 task_manager.spawn_handle().spawn(
1721 "telemetry_hwbench",
1722 None,
1723 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
1724 );
1725 }
1726 }
1727
1728 log::info!("Development Service Ready");
1729
1730 Ok(task_manager)
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735 use crate::chain_spec::moonbase::ChainSpec;
1736 use crate::chain_spec::Extensions;
1737 use jsonrpsee::server::BatchRequestConfig;
1738 use moonbase_runtime::AccountId;
1739 use prometheus::{proto::LabelPair, Counter};
1740 use sc_network::config::NetworkConfiguration;
1741 use sc_service::config::RpcConfiguration;
1742 use sc_service::ChainType;
1743 use sc_service::{
1744 config::{BasePath, DatabaseSource, KeystoreConfig},
1745 Configuration, Role,
1746 };
1747 use std::path::Path;
1748 use std::str::FromStr;
1749
1750 use super::*;
1751
1752 #[test]
1753 fn test_set_prometheus_registry_uses_moonbeam_prefix() {
1754 let counter_name = "my_counter";
1755 let expected_metric_name = "moonbeam_my_counter";
1756 let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1757 let mut config = Configuration {
1758 prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1759 "0.0.0.0:8080".parse().unwrap(),
1760 "".into(),
1761 )),
1762 ..test_config("test")
1763 };
1764
1765 set_prometheus_registry(&mut config, false).unwrap();
1766 let reg = config.prometheus_registry().unwrap();
1768 reg.register(counter.clone()).unwrap();
1769 counter.inc();
1770
1771 let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1772 assert_eq!(actual_metric_name.as_str(), expected_metric_name);
1773 }
1774
1775 #[test]
1776 fn test_set_prometheus_registry_skips_moonbeam_prefix() {
1777 let counter_name = "my_counter";
1778 let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1779 let mut config = Configuration {
1780 prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1781 "0.0.0.0:8080".parse().unwrap(),
1782 "".into(),
1783 )),
1784 ..test_config("test")
1785 };
1786
1787 set_prometheus_registry(&mut config, true).unwrap();
1788 let reg = config.prometheus_registry().unwrap();
1790 reg.register(counter.clone()).unwrap();
1791 counter.inc();
1792
1793 let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1794 assert_eq!(actual_metric_name.as_str(), counter_name);
1795 }
1796
1797 #[test]
1798 fn test_set_prometheus_registry_adds_chain_id_as_label() {
1799 let input_chain_id = "moonriver";
1800
1801 let mut expected_label = LabelPair::default();
1802 expected_label.set_name("chain".to_owned());
1803 expected_label.set_value("moonriver".to_owned());
1804 let expected_chain_label = Some(expected_label);
1805
1806 let counter = Box::new(Counter::new("foo", "foobar").unwrap());
1807 let mut config = Configuration {
1808 prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1809 "0.0.0.0:8080".parse().unwrap(),
1810 "".into(),
1811 )),
1812 ..test_config(input_chain_id)
1813 };
1814
1815 set_prometheus_registry(&mut config, false).unwrap();
1816 let reg = config.prometheus_registry().unwrap();
1818 reg.register(counter.clone()).unwrap();
1819 counter.inc();
1820
1821 let actual_chain_label = reg
1822 .gather()
1823 .first()
1824 .unwrap()
1825 .get_metric()
1826 .first()
1827 .unwrap()
1828 .get_label()
1829 .into_iter()
1830 .find(|x| x.get_name() == "chain")
1831 .cloned();
1832
1833 assert_eq!(actual_chain_label, expected_chain_label);
1834 }
1835
1836 #[test]
1837 fn dalek_does_not_panic() {
1838 use futures::executor::block_on;
1839 use sc_block_builder::BlockBuilderBuilder;
1840 use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
1841 use sp_api::ProvideRuntimeApi;
1842 use sp_consensus::BlockOrigin;
1843 use substrate_test_runtime::TestAPI;
1844 use substrate_test_runtime_client::runtime::Block;
1845 use substrate_test_runtime_client::{
1846 ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
1847 };
1848
1849 fn zero_ed_pub() -> sp_core::ed25519::Public {
1850 sp_core::ed25519::Public::default()
1851 }
1852
1853 fn invalid_sig() -> sp_core::ed25519::Signature {
1857 let signature = hex_literal::hex!(
1858 "a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
1859 7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
1860 );
1861 sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
1862 }
1863
1864 let tmp = tempfile::tempdir().unwrap();
1865 let backend = Arc::new(
1866 Backend::new(
1867 DatabaseSettings {
1868 trie_cache_maximum_size: Some(1 << 20),
1869 state_pruning: Some(PruningMode::ArchiveAll),
1870 blocks_pruning: BlocksPruning::KeepAll,
1871 source: DatabaseSource::RocksDb {
1872 path: tmp.path().into(),
1873 cache_size: 1024,
1874 },
1875 metrics_registry: None,
1876 },
1877 u64::MAX,
1878 )
1879 .unwrap(),
1880 );
1881 let client = TestClientBuilder::with_backend(backend).build();
1882
1883 client
1884 .execution_extensions()
1885 .set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
1886 Block,
1887 sp_io::UseDalekExt,
1888 >::new(1));
1889
1890 let a1 = BlockBuilderBuilder::new(&client)
1891 .on_parent_block(client.chain_info().genesis_hash)
1892 .with_parent_block_number(0)
1893 .enable_proof_recording()
1895 .build()
1896 .unwrap()
1897 .build()
1898 .unwrap()
1899 .block;
1900
1901 block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
1902
1903 assert!(!client
1906 .runtime_api()
1907 .verify_ed25519(
1908 client.chain_info().genesis_hash,
1909 invalid_sig(),
1910 zero_ed_pub(),
1911 vec![]
1912 )
1913 .unwrap());
1914 }
1915
1916 fn test_config(chain_id: &str) -> Configuration {
1917 let network_config = NetworkConfiguration::new("", "", Default::default(), None);
1918 let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
1919 let spec = ChainSpec::builder(&[0u8], Extensions::default())
1920 .with_name("test")
1921 .with_id(chain_id)
1922 .with_chain_type(ChainType::Local)
1923 .with_genesis_config(moonbase_runtime::genesis_config_preset::testnet_genesis(
1924 AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
1925 vec![],
1926 vec![],
1927 vec![],
1928 vec![],
1929 vec![],
1930 ParaId::new(0),
1931 0,
1932 ))
1933 .build();
1934
1935 Configuration {
1936 impl_name: String::from("test-impl"),
1937 impl_version: String::from("0.1"),
1938 role: Role::Full,
1939 tokio_handle: runtime.handle().clone(),
1940 transaction_pool: Default::default(),
1941 network: network_config,
1942 keystore: KeystoreConfig::Path {
1943 path: "key".into(),
1944 password: None,
1945 },
1946 database: DatabaseSource::RocksDb {
1947 path: "db".into(),
1948 cache_size: 128,
1949 },
1950 trie_cache_maximum_size: Some(16777216),
1951 warm_up_trie_cache: None,
1952 state_pruning: Default::default(),
1953 blocks_pruning: sc_service::BlocksPruning::KeepAll,
1954 chain_spec: Box::new(spec),
1955 executor: Default::default(),
1956 wasm_runtime_overrides: Default::default(),
1957 rpc: RpcConfiguration {
1958 addr: None,
1959 max_connections: Default::default(),
1960 cors: None,
1961 methods: Default::default(),
1962 max_request_size: Default::default(),
1963 max_response_size: Default::default(),
1964 id_provider: None,
1965 max_subs_per_conn: Default::default(),
1966 port: Default::default(),
1967 message_buffer_capacity: Default::default(),
1968 batch_config: BatchRequestConfig::Unlimited,
1969 rate_limit: Default::default(),
1970 rate_limit_whitelisted_ips: vec![],
1971 rate_limit_trust_proxy_headers: false,
1972 },
1973 data_path: Default::default(),
1974 prometheus_config: None,
1975 telemetry_endpoints: None,
1976 offchain_worker: Default::default(),
1977 force_authoring: false,
1978 disable_grandpa: false,
1979 dev_key_seed: None,
1980 tracing_targets: None,
1981 tracing_receiver: Default::default(),
1982 announce_block: true,
1983 base_path: BasePath::new(Path::new("")),
1984 }
1985 }
1986}
1987
1988struct PendingConsensusDataProvider<Client>
1989where
1990 Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
1991 Client::Api: VrfApi<Block>,
1992{
1993 client: Arc<Client>,
1994 keystore: Arc<dyn Keystore>,
1995}
1996
1997impl<Client> PendingConsensusDataProvider<Client>
1998where
1999 Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
2000 Client::Api: VrfApi<Block>,
2001{
2002 pub fn new(client: Arc<Client>, keystore: Arc<dyn Keystore>) -> Self {
2003 Self { client, keystore }
2004 }
2005}
2006
2007impl<Client> fc_rpc::pending::ConsensusDataProvider<Block> for PendingConsensusDataProvider<Client>
2008where
2009 Client: HeaderBackend<Block> + sp_api::ProvideRuntimeApi<Block> + Send + Sync,
2010 Client::Api: VrfApi<Block>,
2011{
2012 fn create_digest(
2013 &self,
2014 parent: &Header,
2015 _data: &sp_inherents::InherentData,
2016 ) -> Result<sp_runtime::Digest, sp_inherents::Error> {
2017 let hash = parent.hash();
2018 let mut digest = self
2020 .client
2021 .header(hash)
2022 .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
2023 .ok_or(sp_inherents::Error::Application(
2024 "Best block header should be present".into(),
2025 ))?
2026 .digest;
2027 let nimbus_id = digest
2029 .logs
2030 .iter()
2031 .find_map(|x| {
2032 if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
2033 Some(NimbusId::from_slice(nimbus_id.as_slice()).map_err(|_| {
2034 sp_inherents::Error::Application(
2035 "Nimbus pre-runtime digest should be valid".into(),
2036 )
2037 }))
2038 } else {
2039 None
2040 }
2041 })
2042 .ok_or(sp_inherents::Error::Application(
2043 "Nimbus pre-runtime digest should be present".into(),
2044 ))??;
2045 let pos = digest.logs.iter().position(|x| {
2047 matches!(
2048 x,
2049 DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
2050 )
2051 });
2052 if let Some(pos) = pos {
2053 digest.logs.remove(pos);
2054 }
2055 let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
2057 .provide_digests(nimbus_id, hash);
2058 digest.logs.extend(vrf_digest);
2060 Ok(digest)
2061 }
2062}