Skip to content

Commit

Permalink
Use Mem() instead of Size() to estimate pipelined-memdb size (#1286)
Browse files Browse the repository at this point in the history
* fix: use Mem() instead of Size() to evaluate pipelined-memdb size, for better memory control

Signed-off-by: ekexium <[email protected]>

* test: fix TestPipelinedFlushTrigger

Signed-off-by: ekexium <[email protected]>

---------

Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium authored Apr 15, 2024
1 parent a23f6ca commit 4183ab1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 39 deletions.
64 changes: 43 additions & 21 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,32 @@ const (
// small batch can lead to poor performance and resource waste in random write workload.
// 10K batch size is large enough to get good performance with random write workloads in tests.
MinFlushKeys = 10000
// MinFlushSize is the minimum size of MemDB to trigger flush.
MinFlushSize = 16 * 1024 * 1024 // 16MB
// ForceFlushSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB.
ForceFlushSizeThreshold = 128 * 1024 * 1024 // 128MB
// MinFlushMemSize is the minimum size of MemDB to trigger flush.
MinFlushMemSize uint64 = 16 * 1024 * 1024 // 16MB
// ForceFlushMemSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB.
ForceFlushMemSizeThreshold uint64 = 128 * 1024 * 1024 // 128MB
)

type flushOption struct {
MinFlushKeys int
MinFlushSize int
ForceFlushSizeThreshold int
MinFlushKeys uint64
MinFlushMemSize uint64
ForceFlushMemSizeThreshold uint64
}

func newFlushOption() flushOption {
opt := flushOption{
MinFlushKeys: MinFlushKeys,
MinFlushSize: MinFlushSize,
ForceFlushSizeThreshold: ForceFlushSizeThreshold,
MinFlushKeys: MinFlushKeys,
MinFlushMemSize: MinFlushMemSize,
ForceFlushMemSizeThreshold: ForceFlushMemSizeThreshold,
}
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil {
opt.MinFlushKeys = val.(int)
opt.MinFlushKeys = uint64(val.(int))
}
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil {
opt.MinFlushSize = val.(int)
opt.MinFlushMemSize = uint64(val.(int))
}
if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil {
opt.ForceFlushSizeThreshold = val.(int)
opt.ForceFlushMemSizeThreshold = uint64(val.(int))
}
return opt
}
Expand Down Expand Up @@ -323,15 +323,33 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
}

func (p *PipelinedMemDB) needFlush() bool {
size := p.memDB.Size()
// size < MinFlushSize, do not flush.
// MinFlushSize <= size < ForceFlushSizeThreshold && keys < MinFlushKeys, do not flush.
// MinFlushSize <= size < ForceFlushSizeThreshold && keys >= MinFlushKeys, flush.
// size >= ForceFlushSizeThreshold, flush.
if size < p.flushOption.MinFlushSize || (p.memDB.Len() < p.flushOption.MinFlushKeys && size < p.flushOption.ForceFlushSizeThreshold) {
size := p.memDB.Mem()
// mem size < MinFlushMemSize, do not flush.
// MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys < MinFlushKeys, do not flush.
// MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys >= MinFlushKeys, flush.
// mem size >= ForceFlushMemSizeThreshold, flush.

/*
Keys
^
| |
| |
| |
| | Flush
| |
MinKey(10k) | +------------+
| | |
| No | No Flush | Flush
| Flush | |
+-----------------------------------------> Size
0 MinSize(16MB) Force(128MB)
*/
if size < p.flushOption.MinFlushMemSize ||
(uint64(p.memDB.Len()) < p.flushOption.MinFlushKeys &&
size < p.flushOption.ForceFlushMemSizeThreshold) {
return false
}
if p.onFlushing.Load() && size < p.flushOption.ForceFlushSizeThreshold {
if p.onFlushing.Load() && size < p.flushOption.ForceFlushMemSizeThreshold {
return false
}
return true
Expand Down Expand Up @@ -391,7 +409,11 @@ func (p *PipelinedMemDB) Len() int {
}

func (p *PipelinedMemDB) Size() int {
return p.memDB.Size() + p.size
size := p.size
if p.memDB != nil {
size += p.memDB.Size()
}
return size
}

func (p *PipelinedMemDB) OnFlushing() bool {
Expand Down
41 changes: 23 additions & 18 deletions internal/unionstore/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,50 @@ func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]by
}

func TestPipelinedFlushTrigger(t *testing.T) {
avgKeySize := MinFlushSize / MinFlushKeys
// because memdb's memory usage is hard to control, we use a cargo-culted value here.
avgKeySize := int(MinFlushMemSize/MinFlushKeys) / 3

// block the flush goroutine for checking the flushingMemDB status.
blockCh := make(chan struct{})
// Will not flush when keys number >= MinFlushKeys and size < MinFlushSize
// Will not flush when keys number >= MinFlushKeys and size < MinFlushMemSize
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)-1) // (key + value) * MinFLushKeys < MinFlushKeys
value := make([]byte, avgKeySize-len(key)-1)
// (key + value) * MinFlushKeys < MinFlushMemSize
memdb.Set(key, value)
flushed, err := memdb.Flush(false)
require.False(t, flushed)
require.Nil(t, err)
require.False(t, memdb.OnFlushing())
}
require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
require.Less(t, memdb.memDB.Size(), MinFlushSize)
require.Less(t, memdb.memDB.Mem(), MinFlushMemSize)

// Will not flush when keys number < MinFlushKeys and size >= MinFlushSize
// Will not flush when keys number < MinFlushKeys and size >= MinFlushMemSize
avgKeySize = int(MinFlushMemSize/MinFlushKeys) / 2
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)+1) // (key + value) * (MinFLushKeys - 1) > MinFlushKeys
value := make([]byte, avgKeySize-len(key)+1)
// (key + value) * (MinFLushKeys - 1) > MinFlushMemSize
memdb.Set(key, value)
flushed, err := memdb.Flush(false)
require.False(t, flushed)
require.Nil(t, err)
require.False(t, memdb.OnFlushing())
}
require.Less(t, memdb.memDB.Len(), MinFlushKeys)
require.Greater(t, memdb.memDB.Size(), MinFlushSize)
require.Greater(t, memdb.memDB.Mem(), MinFlushMemSize)
require.Less(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)

// Flush when keys number >= MinFlushKeys and size >= MinFlushSize
// Flush when keys number >= MinFlushKeys and mem size >= MinFlushMemSize
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
Expand Down Expand Up @@ -106,7 +111,7 @@ func TestPipelinedFlushSkip(t *testing.T) {
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err := memdb.Flush(false)
Expand All @@ -117,7 +122,7 @@ func TestPipelinedFlushSkip(t *testing.T) {
require.Equal(t, memdb.memDB.Size(), 0)
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err = memdb.Flush(false)
Expand All @@ -144,7 +149,7 @@ func TestPipelinedFlushBlock(t *testing.T) {
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err := memdb.Flush(false)
Expand All @@ -154,13 +159,13 @@ func TestPipelinedFlushBlock(t *testing.T) {
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0)

// When size of memdb is greater than ForceFlushSizeThreshold, Flush will be blocked.
// When size of memdb is greater than ForceFlushMemSizeThreshold, Flush will be blocked.
for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, ForceFlushSizeThreshold/(MinFlushKeys-1)-len(key)+1)
value := make([]byte, int(ForceFlushMemSizeThreshold/(MinFlushKeys-1))-len(key)+1)
memdb.Set(key, value)
}
require.Greater(t, memdb.memDB.Size(), ForceFlushSizeThreshold)
require.Greater(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
flushReturned := make(chan struct{})
oneSec := time.After(time.Second)
go func() {
Expand Down Expand Up @@ -191,7 +196,7 @@ func TestPipelinedFlushGet(t *testing.T) {
memdb.Set([]byte("key"), []byte("value"))
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
value, err := memdb.Get(context.Background(), []byte("key"))
Expand All @@ -214,7 +219,7 @@ func TestPipelinedFlushGet(t *testing.T) {
blockCh <- struct{}{}
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err = memdb.Flush(false)
Expand All @@ -237,7 +242,7 @@ func TestPipelinedFlushSize(t *testing.T) {
keys := 0
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++
size += len(key) + len(value)
memdb.Set(key, value)
Expand All @@ -255,7 +260,7 @@ func TestPipelinedFlushSize(t *testing.T) {

for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1)
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++
size += len(key) + len(value)
memdb.Set(key, value)
Expand Down

0 comments on commit 4183ab1

Please sign in to comment.