Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][tracker]: save most recent (archive) write index to disk #36799

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint and test
  • Loading branch information
VihasMakwana committed Dec 12, 2024
commit c8a4c513a4b935df1595aa642113abc5106245aa
9 changes: 3 additions & 6 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -159,23 +159,20 @@ func (t *fileTracker) restoreArchiveIndex() {
if !t.archiveEnabled() {
return
}
archiveIndex := 0
byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
return
}
archiveIndex, err = byteToIndex(byteIndex)
t.archiveIndex, err = byteToIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err))
return
} else if archiveIndex < 0 || archiveIndex >= t.pollsToArchive {
} else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive {
// safety check. It can happen if `polls_to_archive` was changed.
// It's best if we reset the index or else we might end up writing invalid keys
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
return
t.archiveIndex = 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to check for this case.

However, I wonder if we can handle it better than restarting from zero. What would it take to search the archive for the most recently updated?

I think we could maintain some kind of data structure which notes the time each archive was written. Maybe just map[index]time.Time. Then when we first create the tracker, we can load this up and find the most recent timestamp. We can also check for the case where pollsToArchive has changed and then rewrite the storage to align with the new value.

For example, if we previously saved 10 archives and find that pollsToArchive is now 5, we can find the 5 most recent indices based on the timestamp structure, then rewrite the archive files so that these are 0-4. We should probably even delete the extras from storage as well.

Copy link
Contributor Author

@VihasMakwana VihasMakwana Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski This solution does makes sense to me, but it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.
We might need to load the filesets in memory.
I'll find a few ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.

Can you elaborate?

We might need to load the filesets in memory.

If it's more than one at a time then it defeats the point of the archive.

t.archiveIndex = archiveIndex
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
3 changes: 3 additions & 0 deletions pkg/stanza/operator/persister.go
Original file line number Diff line number Diff line change
@@ -43,5 +43,8 @@ func (p scopedPersister) Delete(ctx context.Context, key string) error {
}

func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error {
for _, op := range ops {
op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key)
}
return p.Persister.Batch(ctx, ops...)
}