diff --git a/bindings/utils/state/megapool.go b/bindings/utils/state/megapool.go index 3154aeb5b..5d57cfe2e 100644 --- a/bindings/utils/state/megapool.go +++ b/bindings/utils/state/megapool.go @@ -1,20 +1,20 @@ package state import ( - "context" "fmt" "math/big" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/rocket-pool/smartnode/bindings/megapool" - "github.com/rocket-pool/smartnode/bindings/node" "github.com/rocket-pool/smartnode/bindings/rocketpool" + "github.com/rocket-pool/smartnode/bindings/utils/multicall" "golang.org/x/sync/errgroup" ) const ( - megapoolValidatorsBatchSize int = 1000 + megapoolValidatorsBatchSize int = 200 + megapoolBatchSize int = 100 ) type NativeMegapoolDetails struct { @@ -50,166 +50,256 @@ func (m *NativeMegapoolDetails) GetMegapoolBondNormalized() *big.Int { return big.NewInt(0).Div(m.NodeBond, big.NewInt(int64(m.ActiveValidatorCount))) } -// Get all megapool validators using the multicaller +// Get all megapool validators using batched multicalls func GetAllMegapoolValidators(rp *rocketpool.RocketPool, contracts *NetworkContracts) ([]megapool.ValidatorInfoFromGlobalIndex, error) { opts := &bind.CallOpts{ BlockNumber: contracts.ElBlockNumber, } - // Get megapool validators count + if contracts.Multicaller == nil { + return nil, fmt.Errorf("multicaller is nil") + } + if contracts.RocketMegapoolManager == nil { + return nil, fmt.Errorf("RocketMegapoolManager contract is nil") + } + + // Capture values from contracts before launching goroutines + multicallerAddress := contracts.Multicaller.ContractAddress + megapoolManagerContract := contracts.RocketMegapoolManager + megapoolValidatorsCount, err := megapool.GetValidatorCount(rp, opts) if err != nil { - return []megapool.ValidatorInfoFromGlobalIndex{}, err + return nil, err } - // Sync + count := int(megapoolValidatorsCount) + validators := make([]megapool.ValidatorInfoFromGlobalIndex, count) + var wg errgroup.Group wg.SetLimit(threadLimit) - validators := make([]megapool.ValidatorInfoFromGlobalIndex, megapoolValidatorsCount) - - // Run the getters in batches - count := int(megapoolValidatorsCount) for i := 0; i < count; i += megapoolValidatorsBatchSize { i := i - max := i + megapoolValidatorsBatchSize - if max > count { - max = count - } + max := min(i+megapoolValidatorsBatchSize, count) - for j := i; j < max; j++ { - j := j // Create a new variable `j` scoped to the loop iteration - validators[j], err = megapool.GetValidatorInfo(rp, uint32(j), opts) + wg.Go(func() error { + mc, err := multicall.NewMultiCaller(rp.Client, multicallerAddress) if err != nil { - return nil, fmt.Errorf("error executing GetValidatorInfo with global index %d", j) + return err } - } + var dummy *big.Int + for j := i; j < max; j++ { + mc.AddCall(megapoolManagerContract, &dummy, "getValidatorInfo", big.NewInt(int64(j))) + } + responses, err := mc.Execute(true, opts) + if err != nil { + return fmt.Errorf("error executing megapool validator multicall: %w", err) + } + for idx, response := range responses { + if !response.Status { + return fmt.Errorf("megapool validator call failed for global index %d", i+idx) + } + validators[i+idx], err = unpackValidatorInfoFromGlobalIndex(megapoolManagerContract, response.ReturnDataRaw) + if err != nil { + return fmt.Errorf("error unpacking validator info for global index %d: %w", i+idx, err) + } + } + return nil + }) + } + if err := wg.Wait(); err != nil { + return nil, fmt.Errorf("error getting megapool validators: %w", err) } return validators, nil } -func GetNodeMegapoolDetails(rp *rocketpool.RocketPool, nodeAccount common.Address, opts *bind.CallOpts) (NativeMegapoolDetails, error) { - - megapoolAddress, err := megapool.GetMegapoolExpectedAddress(rp, nodeAccount, nil) +// Manually unpack a getValidatorInfo response (nested structs don't work with UnpackIntoInterface) +func unpackValidatorInfoFromGlobalIndex(contract *rocketpool.Contract, data []byte) (megapool.ValidatorInfoFromGlobalIndex, error) { + iface, err := contract.ABI.Unpack("getValidatorInfo", data) if err != nil { - return NativeMegapoolDetails{}, err + return megapool.ValidatorInfoFromGlobalIndex{}, err } - // Sync - var wg errgroup.Group - details := NativeMegapoolDetails{Address: megapoolAddress} + src := iface[1].(struct { + LastAssignmentTime uint32 `json:"lastAssignmentTime"` + LastRequestedValue uint32 `json:"lastRequestedValue"` + LastRequestedBond uint32 `json:"lastRequestedBond"` + DepositValue uint32 `json:"depositValue"` - // Return if megapool isn't deployed - details.Deployed, err = megapool.GetMegapoolDeployed(rp, nodeAccount, opts) - if err != nil { - return NativeMegapoolDetails{}, err + Staked bool `json:"staked"` + Exited bool `json:"exited"` + InQueue bool `json:"inQueue"` + InPrestake bool `json:"inPrestake"` + ExpressUsed bool `json:"expressUsed"` + Dissolved bool `json:"dissolved"` + Exiting bool `json:"exiting"` + Locked bool `json:"locked"` + + ExitBalance uint64 `json:"exitBalance"` + LockedTime uint64 `json:"lockedTime"` + }) + + var validator megapool.ValidatorInfoFromGlobalIndex + validator.Pubkey = iface[0].([]byte) + validator.ValidatorInfo.LastAssignmentTime = src.LastAssignmentTime + validator.ValidatorInfo.LastRequestedValue = src.LastRequestedValue + validator.ValidatorInfo.LastRequestedBond = src.LastRequestedBond + validator.ValidatorInfo.DepositValue = src.DepositValue + validator.ValidatorInfo.Staked = src.Staked + validator.ValidatorInfo.Exited = src.Exited + validator.ValidatorInfo.InQueue = src.InQueue + validator.ValidatorInfo.InPrestake = src.InPrestake + validator.ValidatorInfo.ExpressUsed = src.ExpressUsed + validator.ValidatorInfo.Dissolved = src.Dissolved + validator.ValidatorInfo.Exiting = src.Exiting + validator.ValidatorInfo.Locked = src.Locked + validator.ValidatorInfo.ExitBalance = src.ExitBalance + validator.ValidatorInfo.LockedTime = src.LockedTime + validator.MegapoolAddress = iface[2].(common.Address) + validator.ValidatorId = iface[3].(uint32) + + return validator, nil +} + +// Get multiple megapool details at once using batched multicalls +func GetBulkMegapoolDetails(rp *rocketpool.RocketPool, contracts *NetworkContracts, megapoolAddresses []common.Address) (map[common.Address]NativeMegapoolDetails, error) { + opts := &bind.CallOpts{ + BlockNumber: contracts.ElBlockNumber, } - if !details.Deployed { - return details, nil + + count := len(megapoolAddresses) + result := make(map[common.Address]NativeMegapoolDetails, count) + if count == 0 { + return result, nil } - // Load the megapool contract - mega, err := megapool.NewMegaPoolV1(rp, megapoolAddress, nil) - if err != nil { - return NativeMegapoolDetails{}, err + if contracts.Multicaller == nil { + return nil, fmt.Errorf("multicaller is nil") } - details.EffectiveDelegateAddress, err = mega.GetEffectiveDelegate(opts) + // Capture values from contracts before launching goroutines + multicallerAddress := contracts.Multicaller.ContractAddress + megapoolFactory := contracts.RocketMegapoolFactory + nodeDeposit := contracts.RocketNodeDeposit + + megapoolDetails := make([]NativeMegapoolDetails, count) + lastDistributionTimes := make([]*big.Int, count) + delegateExpiries := make([]*big.Int, count) + + // Get all ETH balances in a single batch + balances, err := contracts.BalanceBatcher.GetEthBalances(megapoolAddresses, opts) if err != nil { - return NativeMegapoolDetails{}, err + return nil, fmt.Errorf("error getting megapool balances: %w", err) } - details.DelegateAddress, err = mega.GetDelegate(opts) - if err != nil { - return NativeMegapoolDetails{}, err + + // Create megapool contract bindings (ABI is cached after the first call) + megaContracts := make([]*rocketpool.Contract, count) + for i, addr := range megapoolAddresses { + megapoolDetails[i].Address = addr + megapoolDetails[i].Deployed = true + megapoolDetails[i].EthBalance = balances[i] + + mega, err := megapool.NewMegaPoolV1(rp, addr, nil) + if err != nil { + return nil, fmt.Errorf("error creating megapool contract for %s: %w", addr.Hex(), err) + } + megaContracts[i] = mega.GetContract() } - // Return if delegate is expired - details.DelegateExpired, err = mega.GetDelegateExpired(rp, opts) - if err != nil { - return NativeMegapoolDetails{}, err + // Round 1: all independent fields from every megapool in batched multicalls + var wg errgroup.Group + wg.SetLimit(threadLimit) + for i := 0; i < count; i += megapoolBatchSize { + i := i + max := min(i+megapoolBatchSize, count) + + wg.Go(func() error { + mc, err := multicall.NewMultiCaller(rp.Client, multicallerAddress) + if err != nil { + return err + } + for j := i; j < max; j++ { + addMegapoolDetailsCalls(mc, megaContracts[j], &megapoolDetails[j], &lastDistributionTimes[j]) + } + _, err = mc.FlexibleCall(true, opts) + if err != nil { + return fmt.Errorf("error executing megapool r1 multicall: %w", err) + } + return nil + }) } - if details.DelegateExpired { - return details, nil + if err := wg.Wait(); err != nil { + return nil, fmt.Errorf("error getting megapool details r1: %w", err) } - details.LastDistributionTime, err = mega.GetLastDistributionTime(opts) - if err != nil { - return NativeMegapoolDetails{}, err + // Convert intermediate big.Int values from round 1 + for i := range megapoolDetails { + if lastDistributionTimes[i] != nil { + megapoolDetails[i].LastDistributionTime = lastDistributionTimes[i].Uint64() + } } - wg.Go(func() error { - var err error - details.NodeDebt, err = mega.GetDebt(opts) - return err - }) - wg.Go(func() error { - var err error - details.PendingRewards, err = mega.GetPendingRewards(opts) - return err - }) - wg.Go(func() error { - var err error - details.RefundValue, err = mega.GetRefundValue(opts) - return err - }) - wg.Go(func() error { - var err error - details.ValidatorCount, err = mega.GetValidatorCount(opts) - return err - }) - wg.Go(func() error { - var err error - details.ActiveValidatorCount, err = mega.GetActiveValidatorCount(opts) - return err - }) - wg.Go(func() error { - var err error - details.LockedValidatorCount, err = mega.GetLockedValidatorCount(opts) - return err - }) - wg.Go(func() error { - var err error - details.UseLatestDelegate, err = mega.GetUseLatestDelegate(opts) - return err - }) - wg.Go(func() error { - var err error - details.DelegateExpiry, err = megapool.GetMegapoolDelegateExpiry(rp, details.DelegateAddress, opts) - return err - }) - wg.Go(func() error { - var err error - details.AssignedValue, err = mega.GetAssignedValue(opts) - return err - }) - wg.Go(func() error { - var err error - details.NodeBond, err = mega.GetNodeBond(opts) - return err - }) - wg.Go(func() error { - var err error - details.UserCapital, err = mega.GetUserCapital(opts) - return err - }) - wg.Go(func() error { - var err error - details.EthBalance, err = rp.Client.BalanceAt(context.Background(), details.Address, opts.BlockNumber) - return err - }) - wg.Go(func() error { - var err error - details.NodeQueuedBond, err = mega.GetNodeQueuedBond(opts) - return err - }) - // Wait for data - if err := wg.Wait(); err != nil { - return details, err + // Round 2: dependent fields (delegate expiry from DelegateAddress, bond requirement from ActiveValidatorCount) + var wg2 errgroup.Group + wg2.SetLimit(threadLimit) + for i := 0; i < count; i += megapoolBatchSize { + i := i + max := min(i+megapoolBatchSize, count) + + wg2.Go(func() error { + mc, err := multicall.NewMultiCaller(rp.Client, multicallerAddress) + if err != nil { + return err + } + callCount := 0 + for j := i; j < max; j++ { + if megapoolDetails[j].DelegateExpired { + continue + } + mc.AddCall(megapoolFactory, &delegateExpiries[j], "getDelegateExpiry", megapoolDetails[j].DelegateAddress) + mc.AddCall(nodeDeposit, &megapoolDetails[j].BondRequirement, "getBondRequirement", big.NewInt(int64(megapoolDetails[j].ActiveValidatorCount))) + callCount += 2 + } + if callCount == 0 { + return nil + } + _, err = mc.FlexibleCall(true, opts) + if err != nil { + return fmt.Errorf("error executing megapool r2 multicall: %w", err) + } + return nil + }) + } + if err := wg2.Wait(); err != nil { + return nil, fmt.Errorf("error getting megapool details r2: %w", err) } - details.BondRequirement, err = node.GetBondRequirement(rp, big.NewInt(int64(details.ActiveValidatorCount)), opts) - if err != nil { - return details, err + // Convert intermediate values and build result map + for i := range megapoolDetails { + if delegateExpiries[i] != nil { + megapoolDetails[i].DelegateExpiry = delegateExpiries[i].Uint64() + } + result[megapoolDetails[i].Address] = megapoolDetails[i] } - return details, nil + + return result, nil +} + +// Add all independent multicall entries for a single megapool's details +func addMegapoolDetailsCalls(mc *multicall.MultiCaller, megaContract *rocketpool.Contract, details *NativeMegapoolDetails, lastDistributionTime **big.Int) { + mc.AddCall(megaContract, &details.EffectiveDelegateAddress, "getEffectiveDelegate") + mc.AddCall(megaContract, &details.DelegateAddress, "getDelegate") + mc.AddCall(megaContract, &details.DelegateExpired, "getDelegateExpired") + mc.AddCall(megaContract, lastDistributionTime, "getLastDistributionTime") + mc.AddCall(megaContract, &details.NodeDebt, "getDebt") + mc.AddCall(megaContract, &details.PendingRewards, "getPendingRewards") + mc.AddCall(megaContract, &details.RefundValue, "getRefundValue") + mc.AddCall(megaContract, &details.ValidatorCount, "getValidatorCount") + mc.AddCall(megaContract, &details.ActiveValidatorCount, "getActiveValidatorCount") + mc.AddCall(megaContract, &details.LockedValidatorCount, "getLockedValidatorCount") + mc.AddCall(megaContract, &details.UseLatestDelegate, "getUseLatestDelegate") + mc.AddCall(megaContract, &details.AssignedValue, "getAssignedValue") + mc.AddCall(megaContract, &details.NodeBond, "getNodeBond") + mc.AddCall(megaContract, &details.UserCapital, "getUserCapital") + mc.AddCall(megaContract, &details.NodeQueuedBond, "getNodeQueuedBond") } diff --git a/rocketpool/api/network/stats.go b/rocketpool/api/network/stats.go index 3c1c12e0e..3d7037d52 100644 --- a/rocketpool/api/network/stats.go +++ b/rocketpool/api/network/stats.go @@ -198,10 +198,17 @@ func getStats(c *cli.Context) (*api.NetworkStatsResponse, error) { megapoolAddressSet := make(map[common.Address]bool) // Fetch the global megapool validator index - contracts := rpstate.NetworkContracts{ - ElBlockNumber: nil, + cfg, err := services.GetConfig(c) + if err != nil { + return fmt.Errorf("error getting config: %w", err) + } + multicallerAddress := common.HexToAddress(cfg.Smartnode.GetMulticallAddress()) + balanceBatcherAddress := common.HexToAddress(cfg.Smartnode.GetBalanceBatcherAddress()) + contracts, err := rpstate.NewNetworkContracts(rp, saturnDeployed, multicallerAddress, balanceBatcherAddress, nil) + if err != nil { + return fmt.Errorf("error creating network contracts: %w", err) } - megapoolValidators, err := rpstate.GetAllMegapoolValidators(rp, &contracts) + megapoolValidators, err := rpstate.GetAllMegapoolValidators(rp, contracts) if err != nil { return fmt.Errorf("error getting all megapool validator details: %w", err) } diff --git a/shared/services/state/network-state.go b/shared/services/state/network-state.go index 90ca1208b..f6ec26d11 100644 --- a/shared/services/state/network-state.go +++ b/shared/services/state/network-state.go @@ -262,55 +262,47 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta if err != nil { return nil, fmt.Errorf("error getting all megapool validator details: %w", err) } + m.logLine("4/7 - Retrieved megapool validator global index (%s so far)", time.Since(start)) + megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex)) - // Iterate over the megapool validators to add their pubkey to the list of pubkeys megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey) megapoolValidatorInfo := make(map[types.ValidatorPubkey]*megapool.ValidatorInfoFromGlobalIndex) - for _, validator := range state.MegapoolValidatorGlobalIndex { - // Add the megapool address to a set - if len(validator.Pubkey) > 0 { // TODO CHECK validators without a pubkey - megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], types.ValidatorPubkey(validator.Pubkey)) - megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, types.ValidatorPubkey(validator.Pubkey)) - megapoolValidatorInfo[types.ValidatorPubkey(validator.Pubkey)] = &validator + for i := range state.MegapoolValidatorGlobalIndex { + validator := &state.MegapoolValidatorGlobalIndex[i] + if len(validator.Pubkey) > 0 { + pubkey := types.ValidatorPubkey(validator.Pubkey) + megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], pubkey) + megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, pubkey) + megapoolValidatorInfo[pubkey] = validator } } state.MegapoolToPubkeysMap = megapoolAddressMap - statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{ - Slot: &slotNumber, - }) - if err != nil { - return nil, err - } - state.MegapoolValidatorDetails = statusMap state.MegapoolValidatorInfo = megapoolValidatorInfo - // initialize state.MegapoolDetails - state.MegapoolDetails = make(map[common.Address]rpstate.NativeMegapoolDetails) - // Sync - var wg errgroup.Group - // Iterate the maps and query megapool details - for megapoolAddress := range megapoolAddressMap { - wg.Go(func() error { - - // Load the megapool - mp, err := megapool.NewMegaPoolV1(m.rp, megapoolAddress, opts) - if err != nil { - return err - } - nodeAddress, err := mp.GetNodeAddress(opts) - if err != nil { - return err - } - megapoolDetails, err := rpstate.GetNodeMegapoolDetails(m.rp, nodeAddress, opts) - if err != nil { - return err - } - state.MegapoolDetails[megapoolAddress] = megapoolDetails - return nil + megapoolAddresses := make([]common.Address, 0, len(megapoolAddressMap)) + for addr := range megapoolAddressMap { + megapoolAddresses = append(megapoolAddresses, addr) + } + + // Fetch beacon validator statuses and EL megapool details in parallel + var megapoolWg errgroup.Group + megapoolWg.Go(func() error { + statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{ + Slot: &slotNumber, }) - if err := wg.Wait(); err != nil { - return nil, fmt.Errorf("error getting all megapool details: %w", err) + if err != nil { + return err } + state.MegapoolValidatorDetails = statusMap + return nil + }) + megapoolWg.Go(func() error { + var err error + state.MegapoolDetails, err = rpstate.GetBulkMegapoolDetails(m.rp, contracts, megapoolAddresses) + return err + }) + if err := megapoolWg.Wait(); err != nil { + return nil, fmt.Errorf("error getting megapool details: %w", err) } m.logLine("4/7 - Retrieved megapool validator details (%s so far)", time.Since(start)) } @@ -502,58 +494,46 @@ func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeA if err != nil { return nil, fmt.Errorf("error getting all megapool validator details: %w", err) } + megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex)) - // Iterate over the megapool validators to add their pubkey to the list of pubkeys megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey) megapoolValidatorInfo := make(map[types.ValidatorPubkey]*megapool.ValidatorInfoFromGlobalIndex) - for _, validator := range state.MegapoolValidatorGlobalIndex { - // Add the megapool address to a set - if len(validator.Pubkey) > 0 { // TODO CHECK validators without a pubkey - megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], types.ValidatorPubkey(validator.Pubkey)) - megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, types.ValidatorPubkey(validator.Pubkey)) - megapoolValidatorInfo[types.ValidatorPubkey(validator.Pubkey)] = &validator + for i := range state.MegapoolValidatorGlobalIndex { + validator := &state.MegapoolValidatorGlobalIndex[i] + if len(validator.Pubkey) > 0 { + pubkey := types.ValidatorPubkey(validator.Pubkey) + megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], pubkey) + megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, pubkey) + megapoolValidatorInfo[pubkey] = validator } } state.MegapoolToPubkeysMap = megapoolAddressMap - statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{ - Slot: &slotNumber, - }) - if err != nil { - return nil, err - } - state.MegapoolValidatorDetails = statusMap state.MegapoolValidatorInfo = megapoolValidatorInfo - // initialize state.MegapoolDetails - state.MegapoolDetails = make(map[common.Address]rpstate.NativeMegapoolDetails) - // Sync - var wg errgroup.Group - // Iterate the maps and query megapool details - for megapoolAddress := range megapoolAddressMap { - - megapoolAddress := megapoolAddress - wg.Go(func() error { - - // Load the megapool - mp, err := megapool.NewMegaPoolV1(m.rp, megapoolAddress, opts) - if err != nil { - return err - } - nodeAddress, err := mp.GetNodeAddress(opts) - if err != nil { - return err - } - megapoolDetails, err := rpstate.GetNodeMegapoolDetails(m.rp, nodeAddress, opts) - if err != nil { - return err - } - - state.MegapoolDetails[megapoolAddress] = megapoolDetails - return nil + megapoolAddresses := make([]common.Address, 0, len(megapoolAddressMap)) + for addr := range megapoolAddressMap { + megapoolAddresses = append(megapoolAddresses, addr) + } + + // Fetch beacon validator statuses and EL megapool details in parallel + var megapoolWg errgroup.Group + megapoolWg.Go(func() error { + statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{ + Slot: &slotNumber, }) - if err := wg.Wait(); err != nil { - return nil, fmt.Errorf("error getting all megapool details: %w", err) + if err != nil { + return err } + state.MegapoolValidatorDetails = statusMap + return nil + }) + megapoolWg.Go(func() error { + var err error + state.MegapoolDetails, err = rpstate.GetBulkMegapoolDetails(m.rp, contracts, megapoolAddresses) + return err + }) + if err := megapoolWg.Wait(); err != nil { + return nil, fmt.Errorf("error getting megapool details: %w", err) } m.logLine("%d/%d - Retrieved megapool validator details (total time: %s)", currentStep, steps, time.Since(start)) }