Skip to content

Commit

Permalink
dht: add a prometheus metric for the size of the routing table
Browse files Browse the repository at this point in the history
This is a bit hacky maybe we want to do this through DI ? Would be nice
to have this as a per DHT metric.
  • Loading branch information
Jorropo committed Jun 13, 2023
1 parent 0f65ba1 commit 4ca59dd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
29 changes: 26 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
Expand All @@ -39,6 +37,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/tag"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
Expand Down Expand Up @@ -153,6 +155,8 @@ type IpfsDHT struct {
// a bound channel to limit asynchronicity of in-flight ADD_PROVIDER RPCs
optProvJobsPool chan struct{}

rtSizeGauge prometheus.GaugeFunc

// configuration variables for tests
testAddressUpdateProcessing bool
}
Expand Down Expand Up @@ -221,6 +225,17 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}
}

gauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "libp2p",
Subsystem: "dht_size",
Name: "routing table size",
Help: "The size of the full routing table.",
}, dht.getRtSizeForGauge)
if err := prometheus.Register(gauge); err == nil {
// Only unregister if register is successfull
dht.rtSizeGauge = gauge
}

// register for event bus and network notifications
sn, err := newSubscriberNotifiee(dht)
if err != nil {
Expand Down Expand Up @@ -372,6 +387,10 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

func (dht *IpfsDHT) getRtSizeForGauge() float64 {
return float64(dht.routingTable.Size())
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
Expand Down Expand Up @@ -841,7 +860,11 @@ func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {

// Close calls Process Close.
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
err := dht.proc.Close()
if dht.rtSizeGauge != nil {
prometheus.Unregister(dht.rtSizeGauge)
}
return err
}

func mkDsKey(s string) ds.Key {
Expand Down
25 changes: 24 additions & 1 deletion fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/net/swarm"

"github.com/gogo/protobuf/proto"
u "github.com/ipfs/boxo/util"
Expand All @@ -44,6 +44,8 @@ import (
"github.com/libp2p/go-libp2p-xor/kademlia"
kadkey "github.com/libp2p/go-libp2p-xor/key"
"github.com/libp2p/go-libp2p-xor/trie"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -95,6 +97,8 @@ type FullRT struct {
bulkSendParallelism int

self peer.ID

rtSizeGauge prometheus.Gauge
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -197,6 +201,18 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
self: self,
}

counter := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "libp2p",
Subsystem: "dht_size",
Name: "routing table size",
Help: "The size of the full routing table.",
})
counter.Set(0)
if err := prometheus.Register(counter); err == nil {
// Only unregister if register is successfull
rt.rtSizeGauge = counter
}

rt.wg.Add(1)
go rt.runCrawler(ctx)

Expand Down Expand Up @@ -350,12 +366,19 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
dht.rt = newRt
dht.lastCrawlTime = time.Now()
dht.rtLk.Unlock()

if dht.rtSizeGauge != nil {
dht.rtSizeGauge.Set(float64(len(m)))
}
}
}

func (dht *FullRT) Close() error {
dht.cancel()
err := dht.ProviderManager.Process().Close()
if dht.rtSizeGauge != nil {
prometheus.Unregister(dht.rtSizeGauge)
}
dht.wg.Wait()
return err
}
Expand Down

0 comments on commit 4ca59dd

Please sign in to comment.