精通 Filecoin:Lotus真实数据处理之Provider处理存储
接上篇,当 Client
接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1
协议的流,然后通过流发送存储交易。处理这个协议的正是 HandleDealStream
方法。这个方法直接调用自身的 receiveDeal
方法进行处理。receiveDeal
方法处理如下:从流中读取存储提案
Proposal
对象。proposal, err := s.ReadDealProposal()
这里的流对象是
dealStream
对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。获取 ipld node 对象。
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
生成矿工交易对象。
deal := &storagemarket.MinerDeal{ Client: s.RemotePeer(), Miner: p.net.ID(), ClientDealProposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, Ref: proposal.Piece, }
调用 fsm 状态组的
Begin
的方法,生成一个状态机,并开始跟踪矿工交易对象。err = p.deals.Begin(proposalNd.Cid(), deal)
保存流对象到连接管理器中。
err = p.conns.AddStream(proposalNd.Cid(), s)
发送事件到 fsm 状态组,从而开始对交易对象进行处理。
return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen)
当处理机收到
ProviderEventOpen
状态事件时,因为初始状态为默认值 0,即StorageDealUnknown
,事件处理器对象经过内部处理找到对应的目的状态为StorageDealValidating
,从而调用其处理函数ValidateDealProposal
函数进行处理。
1、`ValidateDealProposal` 函数
这个函数用来验证交易提案对象。
调用 Lotus Provider 适配器对象的
GetChainHead
方法,获取区块链顶部 tipset key 和其高度。tok, height, err := environment.Node().GetChainHead(ctx.Context())if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err)) }
验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。
if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err)) }
检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。
proposal := deal.Proposalif proposal.Provider != environment.Address() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal")) }
检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。
if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired")) }
检查费用是否OK,如果不OK,则发送拒绝事件。
minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30)) if proposal.StoragePricePerEpoch.LessThan(minPrice) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice)) }
检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。
if proposal.PieceSize < environment.Ask().MinPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize)) }if proposal.PieceSize > environment.Ask().MaxPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize)) }
获取客户的资金。
clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err)) }
如果客户可用资金小于总的交易费用,则发送拒绝事件。
if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small")) }
如果交易是验证过的,则进行验证。
fsm 上下文对象的
Trigger
方法,发送事件。return ctx.Trigger(storagemarket.ProviderEventDealDeciding)
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealUnknown
修改为StorageDealAcceptWait
,从而调用其处理函数DecideOnProposal
确定是否接收交易。
2、`DecideOnProposal` 函数
这个函数用来决定接受或拒绝交易。
调用环境对象的
RunCustomDecisionLogic
方法,运行自定义逻辑来验证是不接收客户交易。accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err)) }
如果不接收,则发送拒绝事件。
if !accept { return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason)) }
调用环境对象的
SendSignedResponse
方法,发送签名的响应给客户端。err = environment.SendSignedResponse(ctx.Context(), &network.Response{ State: storagemarket.StorageDealWaitingForData, Proposal: deal.ProposalCid, })if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) }
这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。
断开与客户端的连接。
if err := environment.Disconnect(deal.ProposalCid); err != nil { log.Warnf("closing client connection: %+v", err) }
调用 fsm 上下文对象的
Trigger
方法,发送一个事件。return ctx.Trigger(storagemarket.ProviderEventDataRequested)
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealAcceptWait
修改为StorageDealWaitingForData
,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。当数据开始传输时,数据传输组件发送
ProviderEventDataTransferInitiated
事件,经过事件处理器把状态从StorageDealWaitingForData
修改为StorageDealTransferring
,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。当数据传输完成时,数据传输组件发送
ProviderEventDataTransferCompleted
事件,经过事件处理器把状态从StorageDealTransferring
修改为StorageDealVerifyData
,从而调用其处理函数VerifyData
验证数据。
3、`VerifyData` 函数
这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。
VerifyData
函数流程如下:
调用环境对象的
GeneratePieceCommitmentToFile
方法,生成碎片的 CID 、碎片所在目录和元数据目录。pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector())
GeneratePieceCommitmentToFile
方法内容如下:调用文件存储对象的
CreateTemp
方法,创建一个临时文件。f, err := pio.store.CreateTemp()
生成一个清理函数。
cleanup := func() { f.Close() _ = pio.store.Delete(f.Path()) }
从底层存储对象中获取指定 CID 的内容,然后写入指定文件。
err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...)
获取文件大小,即碎片大小。
pieceSize := uint64(f.Size())
定位到文件开头位置。
_, err = f.Seek(0, io.SeekStart)
使用文件内容生成碎片 ID。
commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize)
关闭文件。
_ = f.Close()
返回碎片 CID 和文件路径。
return commitment, f.Path(), paddedSize, nil
如果矿工设置了
universalRetrievalEnabled
标志,则直接调用GeneratePieceCommitmentWithMetadata
函数进行处理。if p.p.universalRetrievalEnabled { return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector) }
universalRetrievalEnabled
标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。否则,调用 piece IO 对象的
GeneratePieceCommitmentToFile
方法进行处理。pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector)
payloadCid
表示根 Root CID。piece IO 对象的
GeneratePieceCommitmentToFile
方法处理如下:返回碎片 CID 和碎片路径。
return pieceCid, piecePath, filestore.Path(""), err
验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。
if pieceCid != deal.Proposal.PieceCID { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP")) }
3. 调用 fsm 上下文对象的 Trigger
方法,发送一个事件。
return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath)当状态机收到这个事件后,经过事件处理器把状态从 `StorageDealVerifyData` 修改为 `StorageDealEnsureProviderFunds`,从而调用其处理函数 `EnsureProviderFunds` 确定是否接收交易。同时,在调用处理函数之前,通过 `Action` 函数,修改矿工交易对象的 `PiecePath` 和 `MetadataPath` 两个属性。
4、`EnsureProviderFunds` 函数
这个函数用来确定矿工有足够的资金来处理当前交易。
获取 Lotus Provider 适配器。
node := environment.Node()
获取区块链顶部 tipset 对应的 key 和高度。
tok, _, err := node.GetChainHead(ctx.Context())if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err)) }
获取矿工的 worker 地址。
waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) }
调用 Lotus Provider 适配器的
EnsureFunds
方法,确保矿工有足够的资金来处理当前交易。mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) }
如果返回的
mcid
是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的Trigger
方法,发送一个事件。if mcid == cid.Undef { return ctx.Trigger(storagemarket.ProviderEventFunded) }
否则,调用 fsm 上下文对象的
Trigger
方法,发送另一个事件。return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid)
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealEnsureProviderFunds
修改为StorageDealProviderFunding
,从而调用其处理函数WaitForFunding
等待产一步的消息上链。同时,在调用处理函数之前,通过Action
函数,修改矿工交易对象的PublishCid
属性。
5、`WaitForFunding` 函数
这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger
方法,发送一个事件。
函数内容如下:
node := environment.Node()return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err)) } if code != exitcode.Ok { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String())) } return ctx.Trigger(storagemarket.ProviderEventFunded) })
当状态机收到 ProviderEventFunded
这个事件后,经过事件处理器把状态从 StorageDealProviderFunding
修改为 StorageDealPublish
,从而调用其处理函数 PublishDeal
把交易信息上链。同时,在调用处理函数之前,通过 Action
函数,修改矿工交易对象的 PublishCid
属性。
6、`PublishDeal` 函数
这个函数主要用来提交交易信息上链。
生成矿工交易对象。
smDeal := storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, }
调用 Lotus Provider 适配器对象的
PublishDeals
把交易信息上链。mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err)) }
调用 fsm 上下文对象的
Trigger
方法,发送事件。return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid)
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealPublish
修改为StorageDealPublishing
,从而调用其处理函数WaitForPublish
等待交易信息上链。
7、`WaitForPublish` 函数
这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger
方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished
。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing
修改为 StorageDealStaged
,从而调用其处理函数 HandoffDeal
开始扇区密封处理。同时,在调用处理函数之前,通过 Action
函数,修改矿工交易对象的 ConnectionClosed
和 DealID
属性。
return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err)) } if code != exitcode.Ok { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String())) } var retval market.PublishStorageDealsReturn err = retval.UnmarshalCBOR(bytes.NewReader(retBytes)) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) } return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) })
8、`HandoffDeal` 函数
这个函数调用 miner 的 Provide
适配器的
使用碎片路径生成文件对象。
file, err := environment.FileStore().Open(deal.PiecePath)if err != nil { return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) }
使用碎片文件流生成碎片流。
paddedReader, paddedSize := padreader.New(file, uint64(file.Size()))
调用 Lotus Provider 适配器对象的
OnDealComplete
方法,通知交易已经完成,从而把碎片加入某个扇区中。err = environment.Node().OnDealComplete( ctx.Context(), storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, DealID: deal.DealID, FastRetrieval: deal.FastRetrieval, PiecePath: filestore.Path(environment.FileStore().Filename(deal.PiecePath)), }, paddedSize, paddedReader, )if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) }
调用 fsm 上下文对象的
Trigger
方法,发送事件。return ctx.Trigger(storagemarket.ProviderEventDealHandedOff)
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealStaged
修改为StorageDealSealing
,从而调用其处理函数VerifyDealActivated
等待扇区密封结果。
9、`VerifyDealActivated` 函数
生成回调函数。
cb := func(err error) { if err != nil { _ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) } else { _ = ctx.Trigger(storagemarket.ProviderEventDealActivated) } }
当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。
当状态机收到这个事件后,经过事件处理器把状态从
StorageDealSealing
修改为StorageDealActive
,从而调用其处理函数RecordPieceInfo
记录相关信息。调用 Lotus Provider 适配器对象的
OnDealSectorCommitted
方法,等待扇区被提交。err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) }
返回空。
return nil
9、`RecordPieceInfo` 函数
这个函数主要记录相关信息。
最后调用 fsm 上下文对象的 Trigger
方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted
。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive
修改为 StorageDealCompleted
,最终结束状态机处理。
这里会删除碎片的临时文件。
本文链接:https://www.8btc.com/article/632253
转载请注明文章出处
声明:此文出于传递更多信息之目的,并不意味着赞同其观点或证实其描述。本网站所提供的信息,只供参考之用。
-
选择去中心化存储与Filecoin几大理由(上)
2022-05-18 filecoin -
【图说100问·Filecoin】第62问:什么是网络基线?
2021-04-04 filecoin -
Filecoin 中的信誉系统
2021-02-19 filecoin -
Filecoin上演大逃亡?投资者:冲高之后就开空
2020-10-15 filecoin -
千年鸽王 Filecoin 终于要上线!它其实是个国产项目
2020-10-15 filecoin