From f8ba0c1e3fbb4fec64cdba8a631fc2a7c942ea22 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 26 Apr 2020 15:52:33 -0400 Subject: [PATCH 1/4] beginnings of nested dhts --- lookup.go | 6 +++++ nesting/nesting.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++ query.go | 10 +++++---- routing.go | 21 ++++++++++++++--- 4 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 nesting/nesting.go diff --git a/lookup.go b/lookup.go index 057390c49..327cafa95 100644 --- a/lookup.go +++ b/lookup.go @@ -72,6 +72,10 @@ func (lk loggableKeyBytes) String() string { // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { + return dht.GetClosestPeersSeeded(ctx, key, nil) +} + +func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID) (<-chan peer.ID, error) { if key == "" { return nil, fmt.Errorf("can't lookup empty key") } @@ -101,6 +105,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return peers, err }, func() bool { return false }, + seedPeers, ) if err != nil { @@ -121,3 +126,4 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return out, ctx.Err() } + diff --git a/nesting/nesting.go b/nesting/nesting.go new file mode 100644 index 000000000..ba20f777e --- /dev/null +++ b/nesting/nesting.go @@ -0,0 +1,56 @@ +package nesting + +import ( + "context" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-core/host" + + dht "github.com/libp2p/go-libp2p-kad-dht" +) + +type DHT struct { + Inner, Outer *dht.IpfsDHT +} + +func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) { + inner, err := dht.New(ctx, h, innerOptions...) + if err != nil { + return nil, err + } + + outer, err := dht.New(ctx, h, outerOptions...) + if err != nil { + return nil, err + } + + d := &DHT{ + Inner: inner, + Outer: outer, + } + + return d, nil +} + +func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { + var innerResult []peer.ID + peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) + if err == nil { + innerResult = getPeersFromCh(peerCh) + } + + outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult) + if err != nil { + return nil, err + } + + return getPeersFromCh(outerResultCh), nil +} + +func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { + var peers []peer.ID + for p := range peerCh { + peers = append(peers, p) + } + return peers +} \ No newline at end of file diff --git a/query.go b/query.go index 43eb808ec..deab0f4c3 100644 --- a/query.go +++ b/query.go @@ -77,9 +77,9 @@ type lookupWithFollowupResult struct { // // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. -func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID) (*lookupWithFollowupResult, error) { // run the query - lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn) + lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers) if err != nil { return nil, err } @@ -137,10 +137,12 @@ processFollowUp: return lookupRes, nil } -func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID) (*lookupWithFollowupResult, error) { // pick the K closest peers to the key in our Routing table. targetKadID := kb.ConvertKey(target) - seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + if seedPeers == nil { + seedPeers = dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + } if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, diff --git a/routing.go b/routing.go index b57e0ae84..be7875972 100644 --- a/routing.go +++ b/routing.go @@ -290,6 +290,10 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { + return dht.getValuesSeeded(ctx, key, stopQuery, nil) +} + +func (dht *IpfsDHT) getValuesSeeded(ctx context.Context, key string, stopQuery chan struct{}, seedPeers []peer.ID) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) lookupResCh := make(chan *lookupWithFollowupResult, 1) @@ -365,6 +369,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return false } }, + seedPeers, ) if err != nil { @@ -372,7 +377,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st } lookupResCh <- lookupRes - if ctx.Err() == nil { + if ctx.Err() == nil && seedPeers == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() @@ -525,6 +530,10 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { + dht.findProvidersAsyncRoutineSeeded(ctx, key,count, peerOut, nil) +} + +func (dht *IpfsDHT) findProvidersAsyncRoutineSeeded(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, seedPeers []peer.ID) { logger.Debugw("finding providers", "key", key) defer close(peerOut) @@ -608,15 +617,20 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash func() bool { return !findAll && ps.Size() >= count }, + seedPeers, ) - if err == nil && ctx.Err() == nil { + if err == nil && ctx.Err() == nil && seedPeers == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) } } // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { + return dht.FindPeerSeeded(ctx, id, nil) +} + +func (dht *IpfsDHT) FindPeerSeeded(ctx context.Context, id peer.ID, seedPeers []peer.ID) (peer.AddrInfo, error) { if err := id.Validate(); err != nil { return peer.AddrInfo{}, err } @@ -655,6 +669,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, func() bool { return dht.host.Network().Connectedness(id) == network.Connected }, + seedPeers, ) if err != nil { @@ -680,4 +695,4 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, } return peer.AddrInfo{}, routing.ErrNotFound -} +} \ No newline at end of file From eefb8fceeaaeb03b9aa95b38b79d2719907ca1c9 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 26 Apr 2020 23:01:16 -0400 Subject: [PATCH 2/4] add test protocol so that we can runner the outer DHT in client mode, while the inner one runs in server mode --- lookup.go | 1 - nesting/nesting.go | 56 ---------------------------------------------- routing.go | 4 ++-- 3 files changed, 2 insertions(+), 59 deletions(-) delete mode 100644 nesting/nesting.go diff --git a/lookup.go b/lookup.go index 327cafa95..e333fec54 100644 --- a/lookup.go +++ b/lookup.go @@ -126,4 +126,3 @@ func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedP return out, ctx.Err() } - diff --git a/nesting/nesting.go b/nesting/nesting.go deleted file mode 100644 index ba20f777e..000000000 --- a/nesting/nesting.go +++ /dev/null @@ -1,56 +0,0 @@ -package nesting - -import ( - "context" - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/libp2p/go-libp2p-core/host" - - dht "github.com/libp2p/go-libp2p-kad-dht" -) - -type DHT struct { - Inner, Outer *dht.IpfsDHT -} - -func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) { - inner, err := dht.New(ctx, h, innerOptions...) - if err != nil { - return nil, err - } - - outer, err := dht.New(ctx, h, outerOptions...) - if err != nil { - return nil, err - } - - d := &DHT{ - Inner: inner, - Outer: outer, - } - - return d, nil -} - -func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - var innerResult []peer.ID - peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) - if err == nil { - innerResult = getPeersFromCh(peerCh) - } - - outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult) - if err != nil { - return nil, err - } - - return getPeersFromCh(outerResultCh), nil -} - -func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { - var peers []peer.ID - for p := range peerCh { - peers = append(peers, p) - } - return peers -} \ No newline at end of file diff --git a/routing.go b/routing.go index be7875972..eceafa60b 100644 --- a/routing.go +++ b/routing.go @@ -530,7 +530,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - dht.findProvidersAsyncRoutineSeeded(ctx, key,count, peerOut, nil) + dht.findProvidersAsyncRoutineSeeded(ctx, key, count, peerOut, nil) } func (dht *IpfsDHT) findProvidersAsyncRoutineSeeded(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, seedPeers []peer.ID) { @@ -695,4 +695,4 @@ func (dht *IpfsDHT) FindPeerSeeded(ctx context.Context, id peer.ID, seedPeers [] } return peer.AddrInfo{}, routing.ErrNotFound -} \ No newline at end of file +} From ef542addb3a848d8afddc78955243968a6c1bce4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 26 Apr 2020 23:27:44 -0400 Subject: [PATCH 3/4] add transfer client messages --- dht_net.go | 19 +++++- nesting.go | 175 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 nesting.go diff --git a/dht_net.go b/dht_net.go index 167fc9b1c..88b0050ec 100644 --- a/dht_net.go +++ b/dht_net.go @@ -4,6 +4,8 @@ import ( "bufio" "context" "fmt" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/protocol" "io" "sync" "time" @@ -274,6 +276,14 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message return nil } +func (dht *IpfsDHT) getHost() host.Host { + return dht.host +} + +func (dht *IpfsDHT) getProtocols() []protocol.ID { + return append([]protocol.ID{},dht.protocols...) +} + func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) { dht.smlk.Lock() ms, ok := dht.strmap[p] @@ -306,12 +316,17 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa return ms, nil } +type protocolSender interface { + getHost() host.Host + getProtocols() []protocol.ID +} + type messageSender struct { s network.Stream r msgio.ReadCloser lk ctxMutex p peer.ID - dht *IpfsDHT + dht protocolSender invalid bool singleMes int @@ -352,7 +367,7 @@ func (ms *messageSender) prep(ctx context.Context) error { // We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks // one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for // backwards compatibility reasons). - nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) + nstr, err := ms.dht.getHost().NewStream(ctx, ms.p, ms.dht.getProtocols()...) if err != nil { return err } diff --git a/nesting.go b/nesting.go new file mode 100644 index 000000000..d1d0aa192 --- /dev/null +++ b/nesting.go @@ -0,0 +1,175 @@ +package dht + +import ( + "context" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + pstore "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/protocol" + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-msgio" + "io" + "time" + + "github.com/libp2p/go-libp2p-core/host" +) + +type NestedDHT struct { + Inner, Outer *IpfsDHT +} + +const transferProto protocol.ID = "/adin/ipfs/kad/1.0.0" + +func NewNested(ctx context.Context, h host.Host, innerOptions []Option, outerOptions []Option) (*NestedDHT, error) { + inner, err := New(ctx, h, innerOptions...) + if err != nil { + return nil, err + } + + outer, err := New(ctx, h, outerOptions...) + if err != nil { + return nil, err + } + + d := &NestedDHT{ + Inner: inner, + Outer: outer, + } + + h.SetStreamHandler(transferProto, func(s network.Stream) { + defer s.Reset() //nolint + if d.handleNewMessage(s) { + // Gracefully close the stream for writes. + s.Close() + } + }) + + return d, nil +} + +// Returns true on orderly completion of writes (so we can Close the stream). +func (dht *NestedDHT) handleNewMessage(s network.Stream) bool { + ctx := dht.Inner.ctx + r := msgio.NewVarintReaderSize(s, network.MessageSizeMax) + + mPeer := s.Conn().RemotePeer() + + timer := time.AfterFunc(dhtStreamIdleTimeout, func() { _ = s.Reset() }) + defer timer.Stop() + + for { + if dht.Inner.getMode() != modeServer { + logger.Errorf("ignoring incoming dht message while not in server mode") + return false + } + + var req pb.Message + msgbytes, err := r.ReadMsg() + msgLen := len(msgbytes) + if err != nil { + r.ReleaseMsg(msgbytes) + if err == io.EOF { + return true + } + // This string test is necessary because there isn't a single stream reset error + // instance in use. + if err.Error() != "stream reset" { + logger.Debugf("error reading message: %#v", err) + } + if msgLen > 0 { + } + return false + } + err = req.Unmarshal(msgbytes) + r.ReleaseMsg(msgbytes) + if err != nil { + return false + } + + timer.Reset(dhtStreamIdleTimeout) + + var handler dhtHandler + if req.GetType() == pb.Message_FIND_NODE { + handler = dht.Outer.handlerForMsgType(req.GetType()) + } + + if handler == nil { + return false + } + + resp, err := handler(ctx, mPeer, &req) + if err != nil { + return false + } + + if resp == nil { + continue + } + + // send out response msg + err = writeMsg(s, resp) + if err != nil { + return false + } + } +} + +type nestedProtocolSender struct { + host host.Host + proto protocol.ID +} + +func (ps *nestedProtocolSender) getHost() host.Host { + return ps.host +} + +func (ps *nestedProtocolSender) getProtocols() []protocol.ID { + return []protocol.ID{ps.proto} +} + +func (dht *NestedDHT) transferGCP(ctx context.Context, p peer.ID, key string) ([]*peer.AddrInfo, error){ + pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(key), 0) + nps := &nestedProtocolSender{host: dht.Inner.host, proto: transferProto} + ms := &messageSender{p: p, dht: nps, lk: newCtxMutex()} + resp, err := ms.SendRequest(ctx,pmes) + if err != nil { + return nil, err + } + peers := pb.PBPeersToPeerInfos(resp.GetCloserPeers()) + return peers, nil +} + +func (dht *NestedDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { + var innerResult []peer.ID + peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) + if err == nil { + innerResult = getPeersFromCh(peerCh) + } + + seedPeerSet := peer.NewSet() + for _, p := range innerResult { + innerTransferPeers, err := dht.transferGCP(ctx, p, key) + if err == nil { + for _, outerPeer := range innerTransferPeers { + if seedPeerSet.TryAdd(outerPeer.ID) { + dht.Inner.host.Peerstore().AddAddrs(outerPeer.ID, outerPeer.Addrs, pstore.TempAddrTTL) + } + } + } + } + + outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, seedPeerSet.Peers()) + if err != nil { + return nil, err + } + + return getPeersFromCh(outerResultCh), nil +} + +func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { + var peers []peer.ID + for p := range peerCh { + peers = append(peers, p) + } + return peers +} From 17d4fe62c458b66a57b0dd2f4e29d0264852826a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 1 May 2020 03:48:46 -0400 Subject: [PATCH 4/4] moved nesting dht into a separate package. removed ability to be a server of the inner dht and a client of the outer dht from the nesting dht. --- dht_net.go | 19 +---- nesting.go | 175 --------------------------------------------- nesting/nesting.go | 123 +++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 192 deletions(-) delete mode 100644 nesting.go create mode 100644 nesting/nesting.go diff --git a/dht_net.go b/dht_net.go index 88b0050ec..167fc9b1c 100644 --- a/dht_net.go +++ b/dht_net.go @@ -4,8 +4,6 @@ import ( "bufio" "context" "fmt" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/protocol" "io" "sync" "time" @@ -276,14 +274,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message return nil } -func (dht *IpfsDHT) getHost() host.Host { - return dht.host -} - -func (dht *IpfsDHT) getProtocols() []protocol.ID { - return append([]protocol.ID{},dht.protocols...) -} - func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) { dht.smlk.Lock() ms, ok := dht.strmap[p] @@ -316,17 +306,12 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa return ms, nil } -type protocolSender interface { - getHost() host.Host - getProtocols() []protocol.ID -} - type messageSender struct { s network.Stream r msgio.ReadCloser lk ctxMutex p peer.ID - dht protocolSender + dht *IpfsDHT invalid bool singleMes int @@ -367,7 +352,7 @@ func (ms *messageSender) prep(ctx context.Context) error { // We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks // one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for // backwards compatibility reasons). - nstr, err := ms.dht.getHost().NewStream(ctx, ms.p, ms.dht.getProtocols()...) + nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) if err != nil { return err } diff --git a/nesting.go b/nesting.go deleted file mode 100644 index d1d0aa192..000000000 --- a/nesting.go +++ /dev/null @@ -1,175 +0,0 @@ -package dht - -import ( - "context" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - pstore "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-core/protocol" - pb "github.com/libp2p/go-libp2p-kad-dht/pb" - "github.com/libp2p/go-msgio" - "io" - "time" - - "github.com/libp2p/go-libp2p-core/host" -) - -type NestedDHT struct { - Inner, Outer *IpfsDHT -} - -const transferProto protocol.ID = "/adin/ipfs/kad/1.0.0" - -func NewNested(ctx context.Context, h host.Host, innerOptions []Option, outerOptions []Option) (*NestedDHT, error) { - inner, err := New(ctx, h, innerOptions...) - if err != nil { - return nil, err - } - - outer, err := New(ctx, h, outerOptions...) - if err != nil { - return nil, err - } - - d := &NestedDHT{ - Inner: inner, - Outer: outer, - } - - h.SetStreamHandler(transferProto, func(s network.Stream) { - defer s.Reset() //nolint - if d.handleNewMessage(s) { - // Gracefully close the stream for writes. - s.Close() - } - }) - - return d, nil -} - -// Returns true on orderly completion of writes (so we can Close the stream). -func (dht *NestedDHT) handleNewMessage(s network.Stream) bool { - ctx := dht.Inner.ctx - r := msgio.NewVarintReaderSize(s, network.MessageSizeMax) - - mPeer := s.Conn().RemotePeer() - - timer := time.AfterFunc(dhtStreamIdleTimeout, func() { _ = s.Reset() }) - defer timer.Stop() - - for { - if dht.Inner.getMode() != modeServer { - logger.Errorf("ignoring incoming dht message while not in server mode") - return false - } - - var req pb.Message - msgbytes, err := r.ReadMsg() - msgLen := len(msgbytes) - if err != nil { - r.ReleaseMsg(msgbytes) - if err == io.EOF { - return true - } - // This string test is necessary because there isn't a single stream reset error - // instance in use. - if err.Error() != "stream reset" { - logger.Debugf("error reading message: %#v", err) - } - if msgLen > 0 { - } - return false - } - err = req.Unmarshal(msgbytes) - r.ReleaseMsg(msgbytes) - if err != nil { - return false - } - - timer.Reset(dhtStreamIdleTimeout) - - var handler dhtHandler - if req.GetType() == pb.Message_FIND_NODE { - handler = dht.Outer.handlerForMsgType(req.GetType()) - } - - if handler == nil { - return false - } - - resp, err := handler(ctx, mPeer, &req) - if err != nil { - return false - } - - if resp == nil { - continue - } - - // send out response msg - err = writeMsg(s, resp) - if err != nil { - return false - } - } -} - -type nestedProtocolSender struct { - host host.Host - proto protocol.ID -} - -func (ps *nestedProtocolSender) getHost() host.Host { - return ps.host -} - -func (ps *nestedProtocolSender) getProtocols() []protocol.ID { - return []protocol.ID{ps.proto} -} - -func (dht *NestedDHT) transferGCP(ctx context.Context, p peer.ID, key string) ([]*peer.AddrInfo, error){ - pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(key), 0) - nps := &nestedProtocolSender{host: dht.Inner.host, proto: transferProto} - ms := &messageSender{p: p, dht: nps, lk: newCtxMutex()} - resp, err := ms.SendRequest(ctx,pmes) - if err != nil { - return nil, err - } - peers := pb.PBPeersToPeerInfos(resp.GetCloserPeers()) - return peers, nil -} - -func (dht *NestedDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - var innerResult []peer.ID - peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) - if err == nil { - innerResult = getPeersFromCh(peerCh) - } - - seedPeerSet := peer.NewSet() - for _, p := range innerResult { - innerTransferPeers, err := dht.transferGCP(ctx, p, key) - if err == nil { - for _, outerPeer := range innerTransferPeers { - if seedPeerSet.TryAdd(outerPeer.ID) { - dht.Inner.host.Peerstore().AddAddrs(outerPeer.ID, outerPeer.Addrs, pstore.TempAddrTTL) - } - } - } - } - - outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, seedPeerSet.Peers()) - if err != nil { - return nil, err - } - - return getPeersFromCh(outerResultCh), nil -} - -func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { - var peers []peer.ID - for p := range peerCh { - peers = append(peers, p) - } - return peers -} diff --git a/nesting/nesting.go b/nesting/nesting.go new file mode 100644 index 000000000..52c8ad40e --- /dev/null +++ b/nesting/nesting.go @@ -0,0 +1,123 @@ +package nesting + +import ( + "context" + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + ci "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/routing" + "github.com/pkg/errors" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-kad-dht" +) + +// DHT implements the routing interface to +type DHT struct { + Inner, Outer *dht.IpfsDHT +} + +// Assert that IPFS assumptions about interfaces aren't broken. These aren't a +// guarantee, but we can use them to aid refactoring. +var ( + _ routing.ContentRouting = (*DHT)(nil) + _ routing.Routing = (*DHT)(nil) + _ routing.PeerRouting = (*DHT)(nil) + _ routing.PubKeyFetcher = (*DHT)(nil) + _ routing.ValueStore = (*DHT)(nil) +) + +func New(ctx context.Context, h host.Host, innerOptions []dht.Option, outerOptions []dht.Option) (*DHT, error) { + inner, err := dht.New(ctx, h, innerOptions...) + if err != nil { + return nil, err + } + + outer, err := dht.New(ctx, h, outerOptions...) + if err != nil { + return nil, err + } + + d := &DHT{ + Inner: inner, + Outer: outer, + } + + return d, nil +} + +func (dht *DHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { + var innerResult []peer.ID + peerCh, err := dht.Inner.GetClosestPeersSeeded(ctx, key, nil) + if err == nil { + innerResult = getPeersFromCh(peerCh) + } + + outerResultCh, err := dht.Outer.GetClosestPeersSeeded(ctx, key, innerResult) + if err != nil { + return nil, err + } + + return getPeersFromCh(outerResultCh), nil +} + +func getPeersFromCh(peerCh <-chan peer.ID) []peer.ID { + var peers []peer.ID + for p := range peerCh { + peers = append(peers, p) + } + return peers +} + +func (dht *DHT) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) { + panic("implement me") +} + +func (dht *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error { + panic("implement me") +} + +func (dht *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo { + panic("implement me") +} + +func (dht *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + panic("implement me") +} + +func (dht *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error { + panic("implement me") +} + +func (dht *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { + panic("implement me") +} + +func (dht *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { + panic("implement me") +} + +func (dht *DHT) Bootstrap(ctx context.Context) error { + errI := dht.Inner.Bootstrap(ctx) + errO := dht.Outer.Bootstrap(ctx) + + errs := make([]error, 0, 2) + if errI != nil { + errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap inner dht"))) + } + if errO != nil { + errs = append(errs, errors.Wrap(errI, fmt.Sprintf("failed to bootstrap outer dht"))) + } + + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return multierror.Append(errs[0], errs[1:]...) + } +} \ No newline at end of file