Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nesting DHTs #626

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (lk loggableKeyBytes) String() string {
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
return dht.GetClosestPeersSeeded(ctx, key, nil)
}

func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID) (<-chan peer.ID, error) {
if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down Expand Up @@ -101,6 +105,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return peers, err
},
func() bool { return false },
seedPeers,
)

if err != nil {
Expand Down
123 changes: 123 additions & 0 deletions nesting/nesting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package nesting

import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/pkg/errors"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kad-dht"
)

// DHT implements the routing interface to
type DHT struct {
Inner, Outer *dht.IpfsDHT
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*DHT)(nil)
_ routing.Routing = (*DHT)(nil)
_ routing.PeerRouting = (*DHT)(nil)
_ routing.PubKeyFetcher = (*DHT)(nil)
_ routing.ValueStore = (*DHT)(nil)
)

func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) {
inner, err := dht.New(ctx, h, innerOptions...)
if err != nil {
return nil, err
}

outer, err := dht.New(ctx, h, outerOptions...)
if err != nil {
return nil, err
}

d := &DHT{
Inner: inner,
Outer: outer,
}

return d, nil
}

func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
var innerResult []peer.ID
peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil)
if err == nil {
innerResult = getPeersFromCh(peerCh)
}

outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult)
if err != nil {
return nil, err
}

return getPeersFromCh(outerResultCh), nil
}

func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID {
var peers []peer.ID
for p := range peerCh {
peers = append(peers, p)
}
return peers
}

func (dht *DHT) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) {
panic("implement me")
}

func (dht *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error {
panic("implement me")
}

func (dht *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo {
panic("implement me")
}

func (dht *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
panic("implement me")
}

func (dht *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error {
panic("implement me")
}

func (dht *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
panic("implement me")
}

func (dht *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
panic("implement me")
}

func (dht *DHT) Bootstrap(ctx context.Context) error {
errI := dht.Inner.Bootstrap(ctx)
errO := dht.Outer.Bootstrap(ctx)

errs := make([]error, 0, 2)
if errI != nil {
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap inner dht")))
}
if errO != nil {
errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap outer dht")))
}

switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return multierror.Append(errs[0], errs[1:]...)
}
}
10 changes: 6 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ type lookupWithFollowupResult struct {
//
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID) (*lookupWithFollowupResult, error) {
// run the query
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,10 +137,12 @@ processFollowUp:
return lookupRes, nil
}

func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID) (*lookupWithFollowupResult, error) {
// pick the K closest peers to the key in our Routing table.
targetKadID := kb.ConvertKey(target)
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
if seedPeers == nil {
seedPeers = dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
}
if len(seedPeers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand Down
19 changes: 17 additions & 2 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
}

func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
return dht.getValuesSeeded(ctx, key, stopQuery, nil)
}

func (dht *IpfsDHT) getValuesSeeded(ctx context.Context, key string, stopQuery chan struct{}, seedPeers []peer.ID) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan RecvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1)

Expand Down Expand Up @@ -365,14 +369,15 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
return false
}
},
seedPeers,
)

if err != nil {
return
}
lookupResCh <- lookupRes

if ctx.Err() == nil {
if ctx.Err() == nil && seedPeers == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
}
}()
Expand Down Expand Up @@ -525,6 +530,10 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
dht.findProvidersAsyncRoutineSeeded(ctx, key, count, peerOut, nil)
}

func (dht *IpfsDHT) findProvidersAsyncRoutineSeeded(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, seedPeers []peer.ID) {
logger.Debugw("finding providers", "key", key)

defer close(peerOut)
Expand Down Expand Up @@ -608,15 +617,20 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
func() bool {
return !findAll && ps.Size() >= count
},
seedPeers,
)

if err == nil && ctx.Err() == nil {
if err == nil && ctx.Err() == nil && seedPeers == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
}
}

// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
return dht.FindPeerSeeded(ctx, id, nil)
}

func (dht *IpfsDHT) FindPeerSeeded(ctx context.Context, id peer.ID, seedPeers []peer.ID) (peer.AddrInfo, error) {
if err := id.Validate(); err != nil {
return peer.AddrInfo{}, err
}
Expand Down Expand Up @@ -655,6 +669,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
func() bool {
return dht.host.Network().Connectedness(id) == network.Connected
},
seedPeers,
)

if err != nil {
Expand Down