moonbeam_service/lazy_loading/
manual_sealing.rs1use cumulus_primitives_core::BlockT;
18use frame_benchmarking::__private::codec;
19use futures::{Stream, StreamExt, TryFutureExt};
20use sc_client_api::backend::Backend as ClientBackend;
21use sc_client_api::Finalizer;
22use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction};
23use sc_consensus_manual_seal::{
24 finalize_block, rpc, CreatedBlock, EngineCommand, Error, FinalizeBlockParams, ManualSealParams,
25 SealBlockParams, MANUAL_SEAL_ENGINE_ID,
26};
27use sc_transaction_pool_api::TransactionPool;
28use sp_api::ProvideRuntimeApi;
29use sp_blockchain::HeaderBackend;
30use sp_consensus::{BlockOrigin, Environment, Proposer, SelectChain};
31use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
32use sp_runtime::traits::Header;
33use std::marker::PhantomData;
34use std::time::Duration;
35
36pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
37 ManualSealParams {
38 mut block_import,
39 mut env,
40 client,
41 pool,
42 mut commands_stream,
43 select_chain,
44 consensus_data_provider,
45 create_inherent_data_providers,
46 }: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP, P>,
47) where
48 B: BlockT + 'static,
49 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
50 C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
51 CB: ClientBackend<B> + 'static,
52 E: Environment<B> + 'static,
53 E::Proposer: Proposer<B, Proof = P>,
54 CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
55 SC: SelectChain<B> + 'static,
56 TP: TransactionPool<Block = B>,
57 CIDP: CreateInherentDataProviders<B, ()>,
58 P: codec::Encode + Send + Sync + 'static,
59{
60 while let Some(command) = commands_stream.next().await {
61 match command {
62 EngineCommand::SealNewBlock {
63 create_empty,
64 finalize,
65 parent_hash,
66 sender,
67 } => {
68 seal_block(SealBlockParams {
69 sender,
70 parent_hash,
71 finalize,
72 create_empty,
73 env: &mut env,
74 select_chain: &select_chain,
75 block_import: &mut block_import,
76 consensus_data_provider: consensus_data_provider.as_deref(),
77 pool: pool.clone(),
78 client: client.clone(),
79 create_inherent_data_providers: &create_inherent_data_providers,
80 })
81 .await;
82 }
83 EngineCommand::FinalizeBlock {
84 hash,
85 sender,
86 justification,
87 } => {
88 let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
89 finalize_block(FinalizeBlockParams {
90 hash,
91 sender,
92 justification,
93 finalizer: client.clone(),
94 _phantom: PhantomData,
95 })
96 .await
97 }
98 }
99 }
100}
101
102pub const MAX_PROPOSAL_DURATION: u64 = 60;
104
105pub async fn seal_block<B, BI, SC, C, E, TP, CIDP, P>(
107 SealBlockParams {
108 create_empty,
109 finalize,
110 pool,
111 parent_hash,
112 client,
113 select_chain,
114 block_import,
115 env,
116 create_inherent_data_providers,
117 consensus_data_provider: digest_provider,
118 mut sender,
119 }: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>,
120) where
121 B: BlockT,
122 BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
123 C: HeaderBackend<B> + ProvideRuntimeApi<B>,
124 E: Environment<B>,
125 E::Proposer: Proposer<B, Proof = P>,
126 TP: TransactionPool<Block = B>,
127 SC: SelectChain<B>,
128 CIDP: CreateInherentDataProviders<B, ()>,
129 P: codec::Encode + Send + Sync + 'static,
130{
131 let future = async {
132 if pool.status().ready == 0 && !create_empty {
133 return Err(Error::EmptyTransactionPool);
134 }
135
136 let parent = match parent_hash {
140 Some(hash) => client
141 .header(hash)?
142 .ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?,
143 None => select_chain.best_chain().await?,
144 };
145
146 let inherent_data_providers = create_inherent_data_providers
147 .create_inherent_data_providers(parent.hash(), ())
148 .await
149 .map_err(|e| Error::Other(e))?;
150
151 let inherent_data = inherent_data_providers.create_inherent_data().await?;
152
153 let proposer = env
154 .init(&parent)
155 .map_err(|err| Error::StringError(err.to_string()))
156 .await?;
157 let inherents_len = inherent_data.len();
158
159 let digest = if let Some(digest_provider) = digest_provider {
160 digest_provider.create_digest(&parent, &inherent_data)?
161 } else {
162 Default::default()
163 };
164
165 let proposal = proposer
166 .propose(
167 inherent_data.clone(),
168 digest,
169 Duration::from_secs(MAX_PROPOSAL_DURATION),
170 None,
171 )
172 .map_err(|err| Error::StringError(err.to_string()))
173 .await?;
174
175 if proposal.block.extrinsics().len() == inherents_len && !create_empty {
176 return Err(Error::EmptyTransactionPool);
177 }
178
179 let (header, body) = proposal.block.deconstruct();
180 let proof = proposal.proof;
181 let proof_size = proof.encoded_size();
182 let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone());
183 params.body = Some(body);
184 params.finalized = finalize;
185 params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
186 params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
187 proposal.storage_changes,
188 ));
189
190 if let Some(digest_provider) = digest_provider {
191 digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?;
192 }
193
194 let mut post_header = header.clone();
197 post_header
198 .digest_mut()
199 .logs
200 .extend(params.post_digests.iter().cloned());
201
202 match block_import.import_block(params).await? {
203 ImportResult::Imported(aux) => Ok(CreatedBlock {
204 hash: <B as BlockT>::Header::hash(&post_header),
205 aux,
206 proof_size,
207 }),
208 other => Err(other.into()),
209 }
210 };
211
212 rpc::send_result(&mut sender, future.await)
213}