-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodically persist routing table snapshots #650
base: feat/persist-restore-RT
Are you sure you want to change the base?
Conversation
ping @aschmahmann . |
@aschmahmann Have changed the base branch to the feature branch. Let's get this in first and then the restore changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good, mostly made some code organization comments. I don't feel super strongly about any of them, but I do think they'll make the code simpler, let me know if you disagree.
I also added a comment about my concerns with the transition to signed peer records for you to respond to.
|
||
|
||
// Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. | ||
message RoutingTableSnapshot { | ||
message Peer { | ||
// ID of a given peer. | ||
bytes id = 1; | ||
|
||
// multiaddrs for a given peer | ||
repeated bytes addrs = 2; | ||
|
||
// timestamp for when the peer was added to the Routing Table. | ||
// Unix epoch nano seconds. | ||
int64 addedAtNs = 3; | ||
} | ||
|
||
// The peers that were members of the routing table. | ||
repeated Peer peers = 1; | ||
|
||
// The timestamp when this snapshot was taken. | ||
// Unix epoch nano seconds. | ||
int64 timestampNs = 2; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably move this into the persist package since you've made one anyway
func (p *RoutingTableSnapshot_Peer) Addresses() []ma.Multiaddr { | ||
if p == nil { | ||
return nil | ||
} | ||
|
||
maddrs := make([]ma.Multiaddr, 0, len(p.Addrs)) | ||
for _, addr := range p.Addrs { | ||
maddr, err := ma.NewMultiaddrBytes(addr) | ||
if err != nil { | ||
log.Debugw("error decoding multiaddr for peer", "peer", peer.ID(p.Id), "error", err) | ||
continue | ||
} | ||
|
||
maddrs = append(maddrs, maddr) | ||
} | ||
return maddrs | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably move this into the persist package since you've made one anyway
ID peer.ID | ||
Addrs []ma.Multiaddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any reason to keep these separate over using a peer.AddrInfo?
bytes id = 1; | ||
|
||
// multiaddrs for a given peer | ||
repeated bytes addrs = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 if you'd prefer we can discuss this in the restore PR, but when we do a restore and try and load the addresses into the peerstore are we going to run into any issues if we had a signed peer record and then try to restore an unsigned peer record?
// A Snapshotter provides the ability to save and restore a routing table from a Persistent medium. | ||
type Snapshotter interface { | ||
// Load recovers a snapshot from storage, and returns candidates to integrate in a fresh routing table. | ||
Load() ([]*RtSnapshotPeerInfo, error) | ||
|
||
// Store persists the current state of the routing table. | ||
Store(h host.Host, rt *kbucket.RoutingTable) error | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, WDYT about defining this in the DHT instead, i.e. define it where it's used instead of created?
This will make doing the check var _ Snapshotter = (*dsSnapshotter)(nil)
not work. However, this won't be necessary since our code won't compile as the DHT makes use of *dsSnapshotter
.
I'd feel more strongly about this if this package was in a separate repo from the DHT since then the DHT would need to depend on an interface repo. As it is, this is more of a WDYT.
s := &dht_pb.RoutingTableSnapshot{} | ||
if err := s.Unmarshal(val); err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) | ||
} | ||
|
||
result := make([]*RtSnapshotPeerInfo, 0, len(s.Peers)) | ||
for i := range s.Peers { | ||
p := s.Peers[i] | ||
var id peer.ID | ||
if err := id.Unmarshal(p.Id); err != nil { | ||
logSnapshot.Warnw("failed to unmarshal peerId from snapshot", "err", err) | ||
continue | ||
} | ||
|
||
result = append(result, &RtSnapshotPeerInfo{ | ||
ID: id, | ||
Addrs: p.Addresses(), | ||
AddedAt: time.Unix(0, p.AddedAtNs)}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is just about unmarshalling a routing table snapshot into the nice application friendly form. I'd put the conversion functions next to the protobufs like we do for the DHT messages in pb/message.go
snapshotPeers := make([]*dht_pb.RoutingTableSnapshot_Peer, 0, len(pinfos)) | ||
|
||
for _, p := range pinfos { | ||
id, err := p.Id.MarshalBinary() | ||
if err != nil { | ||
logSnapshot.Warnw("encountered error while adding peer to routing table snapshot; skipping", "peer", p.Id, "err", err) | ||
continue | ||
} | ||
rp := &dht_pb.RoutingTableSnapshot_Peer{} | ||
rp.Id = id | ||
addrs := h.Peerstore().Addrs(p.Id) | ||
rp.Addrs = make([][]byte, len(addrs)) | ||
for i, maddr := range addrs { | ||
rp.Addrs[i] = maddr.Bytes() | ||
} | ||
|
||
rp.AddedAtNs = p.AddedAt.UnixNano() | ||
snapshotPeers = append(snapshotPeers, rp) | ||
} | ||
|
||
snap := dht_pb.RoutingTableSnapshot{ | ||
Peers: snapshotPeers, | ||
TimestampNs: time.Now().Unix(), | ||
} | ||
|
||
bytes, err := snap.Marshal() | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal snapshot %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is just about marshalling a routing table snapshot from the nice application friendly form. I'd put the conversion functions next to the protobufs like we do for the DHT messages in pb/message.go
} | ||
|
||
// assert snapshot & close dht | ||
time.Sleep(500 * time.Millisecond) // wait for one snapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 500ms when a snapshot should happen every 100ms? Just want to make sure I'm not missing anything
@@ -387,6 +390,56 @@ func TestValueSetInvalid(t *testing.T) { | |||
testSetGet("valid", true, "newer", nil) | |||
} | |||
|
|||
func TestRoutingTableSnapshot(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename to TestRoutingTableSnapshotStore
or something unless you're going to just extend this test when you test snapshot restoring.
@Stebalien @aschmahmann
One half of #387.
Once we land this, will create a PR for seeding the Routing Table when the DHT starts.