Skip to content

Commit

Permalink
Enables non-atomic validation for peer scoring parameters (#499)
Browse files Browse the repository at this point in the history
* decouples topic scoring parameters

* adds skiping atomic validation for topic parameters

* cleans up

* adds skip atomic validation to peer score threshold

* adds skip atomic validation for peer parameters

* adds test for non-atomic validation

* adds tests for peer score

* adds tests for peer score thresholds

* refactors tests
  • Loading branch information
yhassanzadeh13 authored Sep 8, 2022
1 parent 4f56e8f commit 8866ca8
Show file tree
Hide file tree
Showing 2 changed files with 689 additions and 151 deletions.
219 changes: 173 additions & 46 deletions score_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
)

type PeerScoreThresholds struct {
// GossipThreshold is the score threshold below which gossip propagation is supressed;
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool

// GossipThreshold is the score threshold below which gossip propagation is suppressed;
// should be negative.
GossipThreshold float64

// PublishThreshold is the score threshold below which we shouldn't publish when using flood
// publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold.
PublishThreshold float64

// GraylistThreshold is the score threshold below which message processing is supressed altogether,
// implementing an effective graylist according to peer score; should be negative and <= PublisThreshold.
// GraylistThreshold is the score threshold below which message processing is suppressed altogether,
// implementing an effective gray list according to peer score; should be negative and <= PublishThreshold.
GraylistThreshold float64

// AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
Expand All @@ -32,25 +35,38 @@ type PeerScoreThresholds struct {
}

func (p *PeerScoreThresholds) validate() error {
if p.GossipThreshold > 0 || isInvalidNumber(p.GossipThreshold) {
return fmt.Errorf("invalid gossip threshold; it must be <= 0 and a valid number")
}
if p.PublishThreshold > 0 || p.PublishThreshold > p.GossipThreshold || isInvalidNumber(p.PublishThreshold) {
return fmt.Errorf("invalid publish threshold; it must be <= 0 and <= gossip threshold and a valid number")
}
if p.GraylistThreshold > 0 || p.GraylistThreshold > p.PublishThreshold || isInvalidNumber(p.GraylistThreshold) {
return fmt.Errorf("invalid graylist threshold; it must be <= 0 and <= publish threshold and a valid number")

if !p.SkipAtomicValidation || p.PublishThreshold != 0 || p.GossipThreshold != 0 || p.GraylistThreshold != 0 {
if p.GossipThreshold > 0 || isInvalidNumber(p.GossipThreshold) {
return fmt.Errorf("invalid gossip threshold; it must be <= 0 and a valid number")
}
if p.PublishThreshold > 0 || p.PublishThreshold > p.GossipThreshold || isInvalidNumber(p.PublishThreshold) {
return fmt.Errorf("invalid publish threshold; it must be <= 0 and <= gossip threshold and a valid number")
}
if p.GraylistThreshold > 0 || p.GraylistThreshold > p.PublishThreshold || isInvalidNumber(p.GraylistThreshold) {
return fmt.Errorf("invalid graylist threshold; it must be <= 0 and <= publish threshold and a valid number")
}
}
if p.AcceptPXThreshold < 0 || isInvalidNumber(p.AcceptPXThreshold) {
return fmt.Errorf("invalid accept PX threshold; it must be >= 0 and a valid number")

if !p.SkipAtomicValidation || p.AcceptPXThreshold != 0 {
if p.AcceptPXThreshold < 0 || isInvalidNumber(p.AcceptPXThreshold) {
return fmt.Errorf("invalid accept PX threshold; it must be >= 0 and a valid number")
}
}
if p.OpportunisticGraftThreshold < 0 || isInvalidNumber(p.OpportunisticGraftThreshold) {
return fmt.Errorf("invalid opportunistic grafting threshold; it must be >= 0 and a valid number")

if !p.SkipAtomicValidation || p.OpportunisticGraftThreshold != 0 {
if p.OpportunisticGraftThreshold < 0 || isInvalidNumber(p.OpportunisticGraftThreshold) {
return fmt.Errorf("invalid opportunistic grafting threshold; it must be >= 0 and a valid number")
}
}

return nil
}

type PeerScoreParams struct {
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool

// Score parameters per topic.
Topics map[string]*TopicScoreParams

Expand Down Expand Up @@ -99,12 +115,15 @@ type PeerScoreParams struct {
}

type TopicScoreParams struct {
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool

// The weight of the topic.
TopicWeight float64

// P1: time in the mesh
// This is the time the peer has ben grafted in the mesh.
// The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap
// This is the time the peer has been grafted in the mesh.
// The value of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap.
// The weight of the parameter MUST be positive (or zero to disable).
TimeInMeshWeight float64
TimeInMeshQuantum time.Duration
Expand All @@ -124,7 +143,7 @@ type TopicScoreParams struct {
// when validation succeeds.
// This window accounts for the minimum time before a hostile mesh peer trying to game the score
// could replay back a valid message we just sent them.
// It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer
// It effectively tracks first and near-first deliveries, i.e., a message seen from a mesh peer
// before we have forwarded it to them.
// The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
// If the counter exceeds the threshold, its value is 0.
Expand Down Expand Up @@ -159,41 +178,55 @@ func (p *PeerScoreParams) validate() error {
}
}

// check that the topic score is 0 or something positive
if p.TopicScoreCap < 0 || isInvalidNumber(p.TopicScoreCap) {
return fmt.Errorf("invalid topic score cap; must be positive (or 0 for no cap) and a valid number")
if !p.SkipAtomicValidation || p.TopicScoreCap != 0 {
// check that the topic score is 0 or something positive
if p.TopicScoreCap < 0 || isInvalidNumber(p.TopicScoreCap) {
return fmt.Errorf("invalid topic score cap; must be positive (or 0 for no cap) and a valid number")
}
}

// check that we have an app specific score; the weight can be anything (but expected positive)
if p.AppSpecificScore == nil {
return fmt.Errorf("missing application specific score function")
if p.SkipAtomicValidation {
p.AppSpecificScore = func(p peer.ID) float64 {
return 0
}
} else {
return fmt.Errorf("missing application specific score function")
}
}

// check the IP colocation factor
if p.IPColocationFactorWeight > 0 || isInvalidNumber(p.IPColocationFactorWeight) {
return fmt.Errorf("invalid IPColocationFactorWeight; must be negative (or 0 to disable) and a valid number")
}
if p.IPColocationFactorWeight != 0 && p.IPColocationFactorThreshold < 1 {
return fmt.Errorf("invalid IPColocationFactorThreshold; must be at least 1")
if !p.SkipAtomicValidation || p.IPColocationFactorWeight != 0 {
// check the IP collocation factor
if p.IPColocationFactorWeight > 0 || isInvalidNumber(p.IPColocationFactorWeight) {
return fmt.Errorf("invalid IPColocationFactorWeight; must be negative (or 0 to disable) and a valid number")
}
if p.IPColocationFactorWeight != 0 && p.IPColocationFactorThreshold < 1 {
return fmt.Errorf("invalid IPColocationFactorThreshold; must be at least 1")
}
}

// check the behaviour penalty
if p.BehaviourPenaltyWeight > 0 || isInvalidNumber(p.BehaviourPenaltyWeight) {
return fmt.Errorf("invalid BehaviourPenaltyWeight; must be negative (or 0 to disable) and a valid number")
}
if p.BehaviourPenaltyWeight != 0 && (p.BehaviourPenaltyDecay <= 0 || p.BehaviourPenaltyDecay >= 1 || isInvalidNumber(p.BehaviourPenaltyDecay)) {
return fmt.Errorf("invalid BehaviourPenaltyDecay; must be between 0 and 1")
}
if p.BehaviourPenaltyThreshold < 0 || isInvalidNumber(p.BehaviourPenaltyThreshold) {
return fmt.Errorf("invalid BehaviourPenaltyThreshold; must be >= 0 and a valid number")
if !p.SkipAtomicValidation || p.BehaviourPenaltyWeight != 0 || p.BehaviourPenaltyThreshold != 0 {
if p.BehaviourPenaltyWeight > 0 || isInvalidNumber(p.BehaviourPenaltyWeight) {
return fmt.Errorf("invalid BehaviourPenaltyWeight; must be negative (or 0 to disable) and a valid number")
}
if p.BehaviourPenaltyWeight != 0 && (p.BehaviourPenaltyDecay <= 0 || p.BehaviourPenaltyDecay >= 1 || isInvalidNumber(p.BehaviourPenaltyDecay)) {
return fmt.Errorf("invalid BehaviourPenaltyDecay; must be between 0 and 1")
}
if p.BehaviourPenaltyThreshold < 0 || isInvalidNumber(p.BehaviourPenaltyThreshold) {
return fmt.Errorf("invalid BehaviourPenaltyThreshold; must be >= 0 and a valid number")
}
}

// check the decay parameters
if p.DecayInterval < time.Second {
return fmt.Errorf("invalid DecayInterval; must be at least 1s")
}
if p.DecayToZero <= 0 || p.DecayToZero >= 1 || isInvalidNumber(p.DecayToZero) {
return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
if !p.SkipAtomicValidation || p.DecayInterval != 0 || p.DecayToZero != 0 {
if p.DecayInterval < time.Second {
return fmt.Errorf("invalid DecayInterval; must be at least 1s")
}
if p.DecayToZero <= 0 || p.DecayToZero >= 1 || isInvalidNumber(p.DecayToZero) {
return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
}
}

// no need to check the score retention; a value of 0 means that we don't retain scores
Expand All @@ -207,6 +240,43 @@ func (p *TopicScoreParams) validate() error {
}

// check P1
if err := p.validateTimeInMeshParams(); err != nil {
return err
}

// check P2
if err := p.validateMessageDeliveryParams(); err != nil {
return err
}
// check P3
if err := p.validateMeshMessageDeliveryParams(); err != nil {
return err
}

// check P3b
if err := p.validateMessageFailurePenaltyParams(); err != nil {
return err
}

// check P4
if err := p.validateInvalidMessageDeliveryParams(); err != nil {
return err
}

return nil
}

func (p *TopicScoreParams) validateTimeInMeshParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.TimeInMeshWeight == 0 && p.TimeInMeshQuantum == 0 && p.TimeInMeshCap == 0 {
return nil
}
}

// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.

if p.TimeInMeshQuantum == 0 {
return fmt.Errorf("invalid TimeInMeshQuantum; must be non zero")
}
Expand All @@ -220,7 +290,20 @@ func (p *TopicScoreParams) validate() error {
return fmt.Errorf("invalid TimeInMeshCap; must be positive and a valid number")
}

// check P2
return nil
}

func (p *TopicScoreParams) validateMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.FirstMessageDeliveriesWeight == 0 && p.FirstMessageDeliveriesCap == 0 && p.FirstMessageDeliveriesDecay == 0 {
return nil
}
}

// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.

if p.FirstMessageDeliveriesWeight < 0 || isInvalidNumber(p.FirstMessageDeliveriesWeight) {
return fmt.Errorf("invallid FirstMessageDeliveriesWeight; must be positive (or 0 to disable) and a valid number")
}
Expand All @@ -231,7 +314,25 @@ func (p *TopicScoreParams) validate() error {
return fmt.Errorf("invalid FirstMessageDeliveriesCap; must be positive and a valid number")
}

// check P3
return nil
}

func (p *TopicScoreParams) validateMeshMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.MeshMessageDeliveriesWeight == 0 &&
p.MeshMessageDeliveriesCap == 0 &&
p.MeshMessageDeliveriesDecay == 0 &&
p.MeshMessageDeliveriesThreshold == 0 &&
p.MeshMessageDeliveriesWindow == 0 &&
p.MeshMessageDeliveriesActivation == 0 {
return nil
}
}

// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.

if p.MeshMessageDeliveriesWeight > 0 || isInvalidNumber(p.MeshMessageDeliveriesWeight) {
return fmt.Errorf("invalid MeshMessageDeliveriesWeight; must be negative (or 0 to disable) and a valid number")
}
Expand All @@ -251,15 +352,41 @@ func (p *TopicScoreParams) validate() error {
return fmt.Errorf("invalid MeshMessageDeliveriesActivation; must be at least 1s")
}

// check P3b
return nil
}

func (p *TopicScoreParams) validateMessageFailurePenaltyParams() error {
if p.SkipAtomicValidation {
// in selective mode, parameters at their zero values are dismissed from validation.
if p.MeshFailurePenaltyDecay == 0 && p.MeshFailurePenaltyWeight == 0 {
return nil
}
}

// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.

if p.MeshFailurePenaltyWeight > 0 || isInvalidNumber(p.MeshFailurePenaltyWeight) {
return fmt.Errorf("invalid MeshFailurePenaltyWeight; must be negative (or 0 to disable) and a valid number")
}
if p.MeshFailurePenaltyWeight != 0 && (isInvalidNumber(p.MeshFailurePenaltyDecay) || p.MeshFailurePenaltyDecay <= 0 || p.MeshFailurePenaltyDecay >= 1) {
return fmt.Errorf("invalid MeshFailurePenaltyDecay; must be between 0 and 1")
}

// check P4
return nil
}

func (p *TopicScoreParams) validateInvalidMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in selective mode, parameters at their zero values are dismissed from validation.
if p.InvalidMessageDeliveriesDecay == 0 && p.InvalidMessageDeliveriesWeight == 0 {
return nil
}
}

// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.

if p.InvalidMessageDeliveriesWeight > 0 || isInvalidNumber(p.InvalidMessageDeliveriesWeight) {
return fmt.Errorf("invalid InvalidMessageDeliveriesWeight; must be negative (or 0 to disable) and a valid number")
}
Expand All @@ -281,7 +408,7 @@ func ScoreParameterDecay(decay time.Duration) float64 {
return ScoreParameterDecayWithBase(decay, DefaultDecayInterval, DefaultDecayToZero)
}

// ScoreParameterDecay computes the decay factor for a parameter using base as the DecayInterval
// ScoreParameterDecayWithBase computes the decay factor for a parameter using base as the DecayInterval
func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64 {
// the decay is linear, so after n ticks the value is factor^n
// so factor^n = decayToZero => factor = decayToZero^(1/n)
Expand Down
Loading

0 comments on commit 8866ca8

Please sign in to comment.