Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package scheduler

import (
"context"
"fmt"
"math"
"slices"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -518,6 +520,8 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v
return 0
})

var resolvedPubkeys []string

for _, attDuty := range attDuties {
delete(remaining, attDuty.ValidatorIndex)

Expand All @@ -542,12 +546,7 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v
continue
}

log.Info(ctx, "Resolved attester duty",
z.U64("slot", uint64(attDuty.Slot)),
z.U64("vidx", uint64(attDuty.ValidatorIndex)),
z.Any("pubkey", pubkey),
z.U64("epoch", slot.Epoch()),
)
resolvedPubkeys = append(resolvedPubkeys, pubkey.String())

// Schedule aggregation duty as well.
aggDuty := core.NewAggregatorDuty(uint64(attDuty.Slot))
Expand All @@ -557,6 +556,8 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v
}
}

logResolvedDuties(ctx, "Resolved attester duties", slot.Epoch(), resolvedPubkeys)

if len(remaining) > 0 {
log.Warn(ctx, "Missing attester duties from beacon node. Some validators did not receive duty assignments. Check beacon node sync status and validator activation", nil,
z.U64("slot", slot.Slot),
Expand Down Expand Up @@ -655,6 +656,8 @@ func (s *Scheduler) resolveSyncCommDuties(ctx context.Context, slot core.Slot, v
}
}

var syncResolvedPubkeys []string

for _, syncCommDuty := range duties {
vIdx := syncCommDuty.ValidatorIndex

Expand Down Expand Up @@ -682,13 +685,11 @@ func (s *Scheduler) resolveSyncCommDuties(ctx context.Context, slot core.Slot, v
s.setDutyDefinition(duty, slot.Epoch(), pubkey, core.NewSyncCommitteeDefinition(syncCommDuty))
}

log.Info(ctx, "Resolved sync committee duty",
z.U64("vidx", uint64(vIdx)),
z.Any("pubkey", pubkey),
z.U64("epoch", slot.Epoch()),
)
syncResolvedPubkeys = append(syncResolvedPubkeys, pubkey.String())
}

logResolvedDuties(ctx, "Resolved sync committee duties", slot.Epoch(), syncResolvedPubkeys)

return nil
}

Expand Down Expand Up @@ -1069,3 +1070,25 @@ func (v validators) Indexes() []eth2p0.ValidatorIndex {

return resp
}

// logResolvedDuties logs resolved duties with pubkeys chunked to stay within Loki's max log line length.
func logResolvedDuties(ctx context.Context, msg string, epoch uint64, pubkeys []string) {
if len(pubkeys) == 0 {
return
}

const chunkSize = 400 // ~8 bytes per pubkey, 400*8=3200 bytes leaves room for other fields within 4KB limit.

chunks := (len(pubkeys) + chunkSize - 1) / chunkSize
for i := range chunks {
start := i * chunkSize
end := min(start+chunkSize, len(pubkeys))

log.Info(ctx, msg,
z.Int("count", len(pubkeys)),
z.U64("epoch", epoch),
z.Str("chunk", fmt.Sprintf("%d/%d", i+1, chunks)),
z.Str("pubkeys", strings.Join(pubkeys[start:end], ",")),
)
}
}
4 changes: 3 additions & 1 deletion core/sigagg/sigagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, set map[core

slotAggregationDuration.WithLabelValues(duty.Type.String()).Observe(time.Since(slotAggregateStart).Seconds())

log.Debug(ctx, "Successfully aggregated partial signatures to reach threshold")
log.Debug(ctx, "Successfully aggregated partial signatures to reach threshold",
z.Int("pubkeys", len(set)),
)

// Call subscriptions.
for _, sub := range a.subs {
Expand Down
Loading