Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions ethstorage/miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type worker struct {
db ethdb.Database
storageMgr *es.StorageManager

headUpdateCh chan uint64

chainHeadCh chan eth.L1BlockRef
startCh chan uint64
exitCh chan struct{}
Expand Down Expand Up @@ -155,6 +157,7 @@ func newWorker(
dataReader: dr,
prover: prover,
chainHeadCh: chainHeadCh,
headUpdateCh: make(chan uint64, 1),
shardTaskMap: make(map[uint64]task),
exitCh: make(chan struct{}),
startCh: make(chan uint64, 1),
Expand Down Expand Up @@ -275,6 +278,7 @@ func (w *worker) newWorkLoop() {
w.shardTaskMap[shardIdx] = task

case block := <-w.chainHeadCh:
w.updateLatestL1Head(block.Number)
if !w.isRunning() {
break
}
Expand All @@ -296,6 +300,20 @@ func (w *worker) newWorkLoop() {
}
}

func (w *worker) updateLatestL1Head(new uint64) {
for {
select {
case w.headUpdateCh <- new:
return
case old := <-w.headUpdateCh:
// Keep monotonic block height while replacing buffered value.
if old > new {
new = old
}
}
}
}

// assign tasks to threads with split nonce range
func (w *worker) assignTasks(task task, block eth.L1BlockRef, reqDiff *big.Int) {
seg := w.config.NonceLimit / w.config.ThreadsPerShard
Expand Down Expand Up @@ -426,12 +444,9 @@ func (w *worker) resultLoop() {
continue
}
w.lg.Info("Mining result loop get result", "shard", result.startShardId, "block", result.blockNumber, "nonce", result.nonce)

// Mining result comes within the same block time window
if tillNextSlot := int64(result.timestamp) + int64(w.config.Slot) - time.Now().Unix(); tillNextSlot > 0 {
// Wait until next block comes to avoid empty blockhash on gas estimation
w.lg.Info("Hold on submitting mining result till block+1", "block", result.blockNumber, "secondsToWait", tillNextSlot)
time.Sleep(time.Duration(tillNextSlot) * time.Second)
// Wait until next block comes to avoid empty blockhash on gas estimation
if !w.waitUntilBlockAdvanced(result.blockNumber.Uint64()) {
return
}
txHash, err := w.l1API.SubmitMinedResult(
context.Background(),
Expand Down Expand Up @@ -505,6 +520,22 @@ func (w *worker) resultLoop() {
}
}

// waitUntilBlockAdvanced waits until L1 head is strictly newer than the mined block.
func (w *worker) waitUntilBlockAdvanced(mined uint64) bool {
for {
select {
case latest := <-w.headUpdateCh:
if latest > mined {
w.lg.Info("L1 head advanced since mined block", "mined", mined, "latest", latest)
return true
}
w.lg.Info("L1 head not advanced since mined block, keep waiting", "mined", mined, "latest", latest)
case <-w.exitCh:
return false
}
}
}

func (w *worker) reportMiningResult(rs *result, txHash common.Hash, err error) {
msg := fmt.Sprintf(
"A storage proof was generated by es-node for shard %d at block %v.\r\n\r\n",
Expand Down
Loading