Skip to content

Commit

Permalink
rsm: not to call ManagedStateMachine.Prepare() when saving dummy snap…
Browse files Browse the repository at this point in the history
…shots

Fixes #162
  • Loading branch information
lni committed Jan 31, 2021
1 parent b9784da commit d393277
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 12 deletions.
6 changes: 5 additions & 1 deletion internal/rsm/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ func (s *StateMachine) setLastApplied(entries []pb.Entry) {
}
}

func (s *StateMachine) savingDummySnapshot(r SSRequest) bool {
return s.OnDiskStateMachine() && !r.Streaming() && !r.Exported()
}

func (s *StateMachine) checkSnapshotStatus(r SSRequest) error {
if s.aborted {
return sm.ErrSnapshotStopped
Expand Down Expand Up @@ -771,7 +775,7 @@ func (s *StateMachine) prepare(r SSRequest) (SSMeta, error) {
}
var err error
var ctx interface{}
if s.Concurrent() {
if s.Concurrent() && !s.savingDummySnapshot(r) {
ctx, err = s.sm.Prepare()
if err != nil {
return SSMeta{}, err
Expand Down
123 changes: 112 additions & 11 deletions internal/rsm/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,11 +2016,15 @@ func TestAlreadyAppliedInOnDiskSMEntryTreatedAsNoOP(t *testing.T) {
}

type testManagedStateMachine struct {
first uint64
last uint64
synced bool
nalookup bool
corruptIndex bool
first uint64
last uint64
synced bool
nalookup bool
corruptIndex bool
concurrent bool
onDisk bool
smType pb.StateMachineType
prepareInvoked bool
}

func (t *testManagedStateMachine) Open() (uint64, error) { return 10, nil }
Expand All @@ -2042,8 +2046,11 @@ func (t *testManagedStateMachine) Sync() error {
t.synced = true
return nil
}
func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil }
func (t *testManagedStateMachine) Prepare() (interface{}, error) { return nil, nil }
func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil }
func (t *testManagedStateMachine) Prepare() (interface{}, error) {
t.prepareInvoked = true
return nil, nil
}
func (t *testManagedStateMachine) Save(SSMeta,
io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) {
return false, nil
Expand All @@ -2056,9 +2063,9 @@ func (t *testManagedStateMachine) Offloaded() bool { return
func (t *testManagedStateMachine) Loaded() {}
func (t *testManagedStateMachine) Close() {}
func (t *testManagedStateMachine) DestroyedC() <-chan struct{} { return nil }
func (t *testManagedStateMachine) Concurrent() bool { return false }
func (t *testManagedStateMachine) OnDisk() bool { return false }
func (t *testManagedStateMachine) Type() pb.StateMachineType { return 0 }
func (t *testManagedStateMachine) Concurrent() bool { return t.concurrent }
func (t *testManagedStateMachine) OnDisk() bool { return t.onDisk }
func (t *testManagedStateMachine) Type() pb.StateMachineType { return t.smType }
func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error) {
if !t.corruptIndex {
t.first = ents[0].Index
Expand All @@ -2068,7 +2075,6 @@ func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, er
ents[idx].Index = ents[idx].Index + 1
}
}

return ents, nil
}

Expand Down Expand Up @@ -2411,3 +2417,98 @@ func TestSetLastApplied(t *testing.T) {
}()
}
}

func TestSavingDummySnapshot(t *testing.T) {
tests := []struct {
smType pb.StateMachineType
streaming bool
export bool
result bool
}{
{pb.RegularStateMachine, true, false, false},
{pb.RegularStateMachine, false, true, false},
{pb.RegularStateMachine, false, false, false},
{pb.ConcurrentStateMachine, true, false, false},
{pb.ConcurrentStateMachine, false, true, false},
{pb.ConcurrentStateMachine, false, false, false},
{pb.OnDiskStateMachine, true, false, false},
{pb.OnDiskStateMachine, false, true, false},
{pb.OnDiskStateMachine, false, false, true},
}
for idx, tt := range tests {
sm := StateMachine{
onDiskSM: tt.smType == pb.OnDiskStateMachine,
}
var rt SSReqType
if tt.export && tt.streaming {
panic("bad test input")
}
if tt.export {
rt = Exported
} else if tt.streaming {
rt = Streaming
}
if r := sm.savingDummySnapshot(SSRequest{Type: rt}); r != tt.result {
t.Errorf("%d, got %t, want %t", idx, r, tt.result)
}
}
}

func TestPrepareIsNotCalledWhenSavingDummySnapshot(t *testing.T) {
tests := []struct {
onDiskSM bool
streaming bool
export bool
prepareInvoked bool
}{
{true, false, false, false},
{true, true, false, true},
{true, false, true, true},
{false, false, false, true},
{false, false, true, true},
}

for idx, tt := range tests {
msm := &testManagedStateMachine{
concurrent: true,
onDisk: tt.onDiskSM,
smType: pb.ConcurrentStateMachine,
}
if tt.onDiskSM {
msm.smType = pb.OnDiskStateMachine
}
m := &membership{
members: &pb.Membership{
Addresses: map[uint64]string{1: "localhost:1234"},
},
}
sm := StateMachine{
index: 100,
onDiskSM: tt.onDiskSM,
sm: msm,
members: m,
node: &testNodeProxy{},
sessions: NewSessionManager(),
}
var rt SSReqType
if tt.export && tt.streaming {
panic("bad test input")
}
if tt.export {
rt = Exported
} else if tt.streaming {
rt = Streaming
}
meta, err := sm.prepare(SSRequest{Type: rt})
if err != nil {
t.Errorf("prepare failed, %v", err)
}
if meta.Index != 100 {
t.Errorf("failed to get the snapshot metadata")
}
if msm.prepareInvoked != tt.prepareInvoked {
t.Errorf("%d, prepareInvoked got %t, want %t",
idx, msm.prepareInvoked, tt.prepareInvoked)
}
}
}

0 comments on commit d393277

Please sign in to comment.