Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direction towards allocation-free operations #1184

Open
schreter opened this issue Jul 20, 2024 · 22 comments
Open

Direction towards allocation-free operations #1184

schreter opened this issue Jul 20, 2024 · 22 comments

Comments

@schreter
Copy link
Collaborator

Currently, openraft allocates at many places, which is a bit contraproductive, since any OOM will panic and tear down the state machine unnecessarily. We should look at possibilities how to operate in constant memory.

Basically, we have following parts/task that cooperate:

  • core task handling the current state
  • log task applying logs to the log
  • storage task applying logs to the state machine
  • replication tasks sending logs to other nodes
  • acceptor task (outside of openraft) receiving logs from the leader and pushing them to openraft

Now, all of these tasks basically operate on the same list of entries, which are either received from the client (on the leader) or replicated from the leader (on the follower).

The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry objects for the relevant logs and keep them until they are fully processed.

I.e., on the leader:

  • the entry enters the buffer as the next log entry, which causes accepted pointer to change
  • the log task watches accepted pointer and when it changes, starts writing the log and updates submitted pointer
  • after the log is written, flushed pointer is updated
  • replication tasks watch accepted pointer as well and send entries to the respective follower
  • after the replication is completed, the respective pointer for the follower is updated as well
  • the core task watches flushed pointer and the pointers from followers and computes the committed pointer
  • the core task updates to_apply pointer to the minimum of flushed and committed pointer (we cannot apply locally until the local log was written)
  • the storage task watches to_apply pointer and similar to log and replication tasks, applies log entries
  • after the log entry is applied, the applied pointer is updated and async client callback in the entry is called

The API for respective user code should provide entries as a pair of an async Stream and a notification object, where the stream internally awaits the pointer change and then feeds the entries from the buffer until this pointer, then awaiting the next change. When the needful is done, the notification object is informed about processed entries (not necessarily for each single entry). This notification object could be a wrapper over watch::Sender or similar for the first implementation.

On the follower:

  • there is no RPC API, rather a connection API from the leader which feeds it log entries (as described above)
  • upon connecting, typically, a task is started by the user code consuming these log entries from the network (user-specific)
  • for each incoming log entry, the entry is pushed to openraft into the same buffer as the client write would put it (unless it's already there or known to be committed - when the leader retries after disconnect/reconnect)
  • accepted pointer will change, causing the log to be written
  • after the log is written, flushed pointer will change, which can be exposed for example as a watch::Receiver to the task handling incoming entries
  • the task handling incoming entries will send the flushed state back in an implementation-specific way
  • the advantage is that receive and send part of the connection can be also handled independently

What about elections?

  • the raft core task decides to send a VoteRequest upon a timeout
  • the VoteRequest is sent via a dedicated method, basically similar to what we have today
  • on the remote node, the VoteRequest received by the user-specific task is stored in peer's state and the raft core task is woken up (on a Watch, for example)
  • the raft core task processes the Vote change and sends the VoteReply back over an oneshot channel or similar to the requestor user task, which sends it back over the network

What happens upon leader change?

  • the storage task has no problem, since it only operates on committed entries and thus can continue as-is without any interruption
  • the log write task needs to be notified to truncate the log before continuing (this can be handled by closing the watch::Sender sending the accepted pointer, after which the task is reinitialized/restarted and first truncates the log)
  • the replication tasks are started or terminated as needed due to the status change (this still can OOM, but let's handle that later)
  • the core task continues running, dropping all entries in the buffer past the truncation point before restarting logging and replication

What about the snapshot?

  • raft core task: decides about snapshot need and sets snapshot_needed flag
  • the storage task will trigger the snapshot upon reading the state the next time (e.g., by interspersing SnapshotRequest in the Stream feeding entries to the storage task)
  • when the snapshot is finished, the core state is updated with the new snapshot log ID

What about purging the log?

  • raft core task: requests snapshots and computes the purged pointer as usual
  • log task: we can store purged and accepted in a common state object, so the log task can do the needful
  • similar to storage task, the Stream reading entries for the log can intersperse them with Purge requests

So we have following state to watch:

  • core task state, which includes the log flushed pointers of individual followers (also local one), including their votes, the applied pointer and snapshot_log_id
  • log task state, which includes accepted and purged pointers (in one state object)
  • storage task state, watching to_apply pointer and snapshot_needed flag (in one state object)
  • replication task states, watching accepted pointer

All tasks read entries from the shared buffer up to accepted, only the raft core task adds entries to the buffer and updates accepted afterwards, so the Rust AXM rules are followed. This will likely need a bit of unsafe code to implement the buffer optimally.

The raft core task listens on multiple event sources:

  • the core task state, which also includes voting requests
  • a bounded channel sending requests from the client, as long as there is space in the buffer (so we don't accept more than the size of the buffer before the entries are processed by the log/storage/replication/snapshot)

Since the watch state for the core task is relatively large and frequently updated, I'd implement it a bit differently - push the most of the individual requests via atomic change to the state member (up to 16B can be updated in one go, which should be sufficient for most items) and a simple signalization semaphore to ensure the task is running or will wake up after the update. This is also true for other signalization - for example, signaling the log task needs two 8B values, so it can be also done atomically by a simple atomic write to the respective value followed by task wakeup.

With this, we have no unbounded channels or other unbounded allocations in the entire state in the openraft itself (except for leader change, where we restart replication tasks - that needs to be addressed separately). It's up to the user to implement the handling in an OOM-safe way.

I hope this all is somewhat understandable. We can start implementing it also piecewise - for example, first handle the local log writing and state machine update in this way (which can by default call legacy methods easily or alternatively we can provide a wrapper calling legacy interface to allow for a smooth transition). Then, continue with replication tasks.

Comments? Did I forget about something?

Copy link

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@drmingdrmer
Copy link
Member

I have not reviewed it thoroughly yet, but there are a few design aspects that
conflict with the current codebase, AFAIK:

In general, a watch notification may not be suitable for the current
architecture because watch does not guarantee the exact order of events. For
example, if Openraft updates submitted before actually submitting IO to
RaftLogStorage, it is possible that the watch::Receiver notifies the user
before the IO is submitted. Conversely, if IO is submitted before updating
submitted, the callback to RaftLogStorage::append(..., callback) might be
called before submitted is updated.

  • The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry objects for the relevant logs and keep them until they are fully processed.

It is not feasible to keep Entry objects until they are fully processed. If a
follower is offline, the Leader should not retain the Entries that are not sent
to the Follower but release the memory as soon as the Entry is committed.
Further replication should read the log entries from the storage.

I.e., on the leader:
replication tasks watch accepted pointer as well and send entries to the respective follower

There is a problem here because watch may yield an obsolete value. When the
replication task receives an accepted value like Leader-2, log-id-3, the
buffer may have been overwritten by a new leader such as Leader-3, log-id-3'
(a different log-id-3). The replication task may read changed log entries.
That’s why in the current implementation, the logs for replication are
determined by RaftCore.

  • the core task updates to_apply pointer to the minimum of flushed and committed pointer (we cannot apply locally until the local log was written)

A log entry that is submitted is visible to RaftLogReader, thus the state
machine should be able to apply a submitted but not yet flushed log entry. Why
does the state machine need the log entry to be flushed before applying it?

On the follower:

  • for each incoming log entry, the entry is pushed to openraft into the same buffer as the client write would put it (unless it's already there or known to be committed - when the leader retries after disconnect/reconnect)

The follower's buffer may be truncated if the Leader switches. Such an approach
requires the follower API to handle log truncation directly, which may be
somewhat counterintuitive.

@schreter
Copy link
Collaborator Author

In general, a watch notification may not be suitable for the current
architecture because watch does not guarantee the exact order of events.

I'm using watch as a placeholder. Nevertheless, I'm pretty sure it should be suitable in general (though it has its own problems). When you post a value, this value will be definitely received by the receiver. There will be no missed reads. Of course, posting a new value overwrites the previous one, but that's OK, we just want to advance pointers.

Also, submitted is probably a wrong name here. OTOH, what's wrong with notifying replication tasks to replicate a log entry which is NOT yet submitted into the local log? The only thing which needs to wait both for the quorum and for the local log persistence is apply, since otherwise we'd have an inconsistent state machine at restart. But maybe we mean different things here?

It is not feasible to keep Entry objects until they are fully processed. If a follower is offline, the Leader should not retain the Entries that are not sent to the Follower but release the memory as soon as the Entry is committed.

Agreed. But, that's also not a problem, IMHO. The entries don't strictly need to be kept longer than they are today. Today we also keep entries around until they are replicated, unless the follower is lagging too much (at the latest, the log implementation does that, since otherwise it would be very inefficient). If the entries are gone they can still be loaded from the log. Anyway, barring offline followers, the entries are processed rather quickly.

Another option is to simply delegate the entry buffer to the log implementation, basically what we have today. Then, the log implementation can force the slow down of the processing by simply blocking append to the log (and thus overflowing the bounded channel directing requests to the log, which would block further requests). I didn't think about exact design here, though.

My point is, we don't need to collect batches of entries in vectors, we just need to update pointers to the log/entry buffer to instruct the state machine and/or replication what to do. And, if we just pass it the LogReader, they can use the LogReader directly to do their own batching (or short-cut the LogReader and directly access the application-specific log in a more efficient way).

There is a problem here because watch may yield an obsolete value. [...]. The replication task may read changed log entries.

Correct. That's a problem which needs to be solved. A trivial solution I outlined above is simply stopping the replication task (closing the channel) before writing further log after leader change. This guarantees that the watch::Receiver after closing the channel will return with an error and the task can safely end itself. Joining the replication task handle afterwards guarantees that no overwritten entries will be read (since they will only be overwritten after joining it).

However, this would require awaiting joining of the replication task in raft core, which is contraproductive. Your point above with the log entry buffer being the problem is taken. So if we use the log as we use it today, the replication task would try to read the log, but the log was already asked to truncate itself. So the user-specific log implementation needs to resolve this properly (as it is today, no change in log storage concepts).

A log entry that is submitted is visible to RaftLogReader, thus the state machine should be able to apply a submitted but not yet flushed log entry. Why does the state machine need the log entry to be flushed before applying it?

Strictly speaking, it does not. But the problem arises if the process is killed after applying the entry to the state machine, but before flushing the log entry. Restarting the process will find applied log ID > flushed log ID, which might lead to issues. Isn't there also somewhere an assertion regarding this in openraft?

The follower's buffer may be truncated if the Leader switches. Such an approach requires the follower API to handle log truncation directly, which may be somewhat counterintuitive.

Why? If the leader dies, all followers either have the newest (uncommitted) entries or do not. The followers then hold an election and the one having the longest log will win and replicate it to others. So there is no truncation on the follower.

When the original leader rejoins (as a follower) and it has yet another (uncommitted, but flushed) log entry, which has NOT been replicated yet (and thus is unknown to all former followers), then this node needs to truncate the log. This will be necessary no matter what, it must be done also today.

Or am I missing something here?

Thanks.


Based on the discussion so far, I'd modify the approach outlined above by removing the explicit entry buffer and delegating it to the log implementation. I.e., the log implementation is responsible for any optimizations of reading log items and ideally keeping them in memory for sufficient time.

Still, the communication between tasks would be modified from unbounded channels to simply communicating pointers/simple state over watch-like channels (not necessarily the "standard" watch - we can optimize here) and/or bounded channels (e.g., for client_write).

@drmingdrmer
Copy link
Member

I'm using watch as a placeholder. Nevertheless, I'm pretty sure it should be suitable in general (though it has its own problems). When you post a value, this value will be definitely received by the receiver. There will be no missed reads. Of course, posting a new value overwrites the previous one, but that's OK, we just want to advance pointers.

The problem is that after setting the submitted watch::Sender, you can not prevent the consumer(such as log replicator) from seeing it. But at this time point, the IO may not have been actually submitted yet. It is not about missing a message.

Also, submitted is probably a wrong name here. OTOH, what's wrong with notifying replication tasks to replicate a log entry which is NOT yet submitted into the local log? The only thing which needs to wait both for the quorum and for the local log persistence is apply, since otherwise we'd have an inconsistent state machine at restart. But maybe we mean different things here?

I'm talking about the current Openraft implementation. logs that are submitted to RaftLogStorage are visible via LogReader. accepted means the log entries are stored in memory, not yet reach RaftLogStorage.

This is because Openraft aims to separate the logic from the IO operations. When input messages are received, it simply places the IO commands into the output queue instead of executing them immediately. This approach allows for the reordering or batching of IO commands, enabling more efficient processing.

When an IO command is placed into the output queue, it is considered accepted(not visible to reader yet). When the command is executed, such as when commands are pushed to the RaftLogStorage, it is considered submitted(become visible to reader). Finally, when the callback is invoked, the IO is considered flushed(always visible upon restart).

@drmingdrmer
Copy link
Member

It is not feasible to keep Entry objects until they are fully processed. If a follower is offline, the Leader should not retain the Entries that are not sent to the Follower but release the memory as soon as the Entry is committed.

Agreed. But, that's also not a problem, IMHO. The entries don't strictly need to be kept longer than they are today. Today we also keep entries around until they are replicated, unless the follower is lagging too much (at the latest, the log implementation does that, since otherwise it would be very inefficient). If the entries are gone they can still be loaded from the log. Anyway, barring offline followers, the entries are processed rather quickly.

Another option is to simply delegate the entry buffer to the log implementation, basically what we have today. Then, the log implementation can force the slow down of the processing by simply blocking append to the log (and thus overflowing the bounded channel directing requests to the log, which would block further requests). I didn't think about exact design here, though.

Yes the current implementation is using RaftLogStorage as log entries buffer. This way the application is able to manage memory for committed log entries and choose when to evict log entries in case there is a slow Follower.

My point is, we don't need to collect batches of entries in vectors, we just need to update pointers to the log/entry buffer to instruct the state machine and/or replication what to do. And, if we just pass it the LogReader, they can use the LogReader directly to do their own batching (or short-cut the LogReader and directly access the application-specific log in a more efficient way).

This is almost(exactly?) same as the way Openraft does it now.

@drmingdrmer
Copy link
Member

There is a problem here because watch may yield an obsolete value. [...]. The replication task may read changed log entries.

Correct. That's a problem which needs to be solved. A trivial solution I outlined above is simply stopping the replication task (closing the channel) before writing further log after leader change. This guarantees that the watch::Receiver after closing the channel will return with an error and the task can safely end itself. Joining the replication task handle afterwards guarantees that no overwritten entries will be read (since they will only be overwritten after joining it).

However, this would require awaiting joining of the replication task in raft core, which is contraproductive. Your point above with the log entry buffer being the problem is taken. So if we use the log as we use it today, the replication task would try to read the log, but the log was already asked to truncate itself. So the user-specific log implementation needs to resolve this properly (as it is today, no change in log storage concepts).

This is the way Openraft does. The problem is that joining then rebuilding replication streams introduces a small delay.

One solution is to let RaftLogReader::try_get_log_entries() to return a stream of tuple of (Vote, Entry).
The Vote is the vote value stored in RaftLogStorage when the Entry is returned.
This way a Leadership change event can be perceived by replication task.
When the Vote changes, the replication task should quit at once.

@drmingdrmer
Copy link
Member

A log entry that is submitted is visible to RaftLogReader, thus the state machine should be able to apply a submitted but not yet flushed log entry. Why does the state machine need the log entry to be flushed before applying it?

Strictly speaking, it does not. But the problem arises if the process is killed after applying the entry to the state machine, but before flushing the log entry. Restarting the process will find applied log ID > flushed log ID, which might lead to issues. Isn't there also somewhere an assertion regarding this in openraft?

In the current code base, it is possible for the applied log ID to be greater than the flushed log ID. For example, when a snapshot is installed, the state machine is entirely replaced, and the last log ID may lag behind the snapshot's last log ID.

Openraft has already dealt with these cases.

@drmingdrmer
Copy link
Member

The follower's buffer may be truncated if the Leader switches. Such an approach requires the follower API to handle log truncation directly, which may be somewhat counterintuitive.

Why? If the leader dies, all followers either have the newest (uncommitted) entries or do not. The followers then hold an election and the one having the longest log will win and replicate it to others. So there is no truncation on the follower.

When the original leader rejoins (as a follower) and it has yet another (uncommitted, but flushed) log entry, which has NOT been replicated yet (and thus is unknown to all former followers), then this node needs to truncate the log. This will be necessary no matter what, it must be done also today.

What I mean is that using a log entries buffer requires the log to be truncated at a very early stage.
Because the log entries can only be put into the buffer if there is room for it.
But truncating logs is done by RaftLogStorage::truncate, which means you have to actually execute truncate() at once. But with the current implementation, there may be pending IO such as RaftLogStorage::append() in queue waiting to be execute.

@schreter
Copy link
Collaborator Author

Oh, a lot of comments...

The problem is that after setting the submitted watch::Sender, you can not prevent the consumer(such as log replicator) from seeing it. But at this time point, the IO may not have been actually submitted yet. It is not about missing a message.

Sure, we can only inform the consumer after the entry is actually written into the log/log buffer. As I mentioned, I think the term I used is wrong (accepted/submitted).

This is because Openraft aims to separate the logic from the IO operations.

That's right. And my intention is to optimize it :-).

When input messages are received, it simply places the IO commands into the output queue instead of executing them immediately. This approach allows for the reordering or batching of IO commands, enabling more efficient processing.

and

This is almost(exactly?) same as the way Openraft does it now.

Correct. But these queues are unbounded, so it's a problem to run in constant memory. Also, the batching is at the moment NOT at the storage/log level, but rather at the task driving the storage/log and the batches constructed by openraft are then sent to the user-specific code. This on one side allocates and adds potential latency and on the other side makes optimization much more complex.

We have multiple use cases here. Let's look first at the very simplest, which is the storage implementation doing apply. Currently, openraft sends the entries to apply read from the log to the user-specific code, then awaits it/potentially awaits async I/O via callbacks. This works in principle, but it's ineffective.

My suggestion is NOT to pass entries, but rather to only pass the log pointer (committed log ID) to the user and consume the log pointer (applied log ID) from the user. I.e., there are no "commands", but rather there is a simple watch on the committed log ID wrapped suitably. Let the user code deal with reading what is necessary from the log (current APIs are all allocating ones; streaming might help).

Also, let the user code apply the entries read from the log (most likely from memory) and then just announce applied log ID to openraft via watch-like channel (i.e., updating applied log ID).

Similarly, for snapshots, the watch on committed log ID could also carry a flag/counter requesting a snapshot. No additional channel/no command channel. When the snapshot is done, again, it is simply announced to the Raft core via a watch-like channel.

With that, on the one hand, the storage implementation has the full flexibility to implement any I/O optimizations, etc., and the communication between Raft core and the storage task is in constant memory.

The replication tasks are the same in green, at least for append entries.

One solution is to let RaftLogReader::try_get_log_entries() to return a stream of tuple of (Vote, Entry). The Vote is the vote value stored in RaftLogStorage when the Entry is returned. This way a Leadership change event can be perceived by replication task. When the Vote changes, the replication task should quit at once.

Yes, that's a possibility. But, term would be sufficient for "regular" Raft (i.e., CommittedLeaderId for the general case). And, this is already part of LogId, which is read from the log and member of Entry. So no additional information is necessary, it's already there.

What I mean is that using a log entries buffer requires the log to be truncated at a very early stage. Because the log entries can only be put into the buffer if there is room for it. But truncating logs is done by RaftLogStorage::truncate, which means you have to actually execute truncate() at once. But with the current implementation, there may be pending IO such as RaftLogStorage::append() in queue waiting to be execute.

OK, as mentioned, we can simply go with log storage as the buffer (as it is today in openraft). Then, we don't need a limited buffer and from the PoV of openraft the buffer is unlimited. It is then up to the application to do admission control somewhere (or openraft needs to add admission control).


Where I don't have a good idea yet is how to optimize it with constant memory is the communication from the client, through the raft core task to the log writer. Communication from the client to the raft core task can be done using bounded buffer, but that helps only partially. Writing to the log is still unbounded and can overflow and/or block the raft core task if we'd use a bounded command buffer from raft core to log writer (which is a no-go). The same holds for append entries on the follower.

I was thinking about short-cutting this to have only one bounded queue from client write/append entries directly to the log write task and log write task informing the raft core about accepted entries. Since we anyway need to deal with election and truncating the log, the log write task could simply poll both the entry stream from the user (either client write or append entries) and the watch from the raft core task informing it about new election point (via the truncation LogId, which contains a new CommittedLeaderId).

When the watch returns new leader elected, then the log writing task would simply truncate the log to the log index from the new leader. Depending on the state (we are the leader or not), from now on, only entries received from the correct source (client write or append log) would be accepted, the other ones redirected to leader/discarded.

Again, this concept would allow running in constant memory.

There are still some other things like voting, but those can be also handled in constant memory fairly easily.

@drmingdrmer
Copy link
Member

Correct. But these queues are unbounded, so it's a problem to run in constant memory. Also, the batching is at the moment NOT at the storage/log level, but rather at the task driving the storage/log and the batches constructed by openraft are then sent to the user-specific code. This on one side allocates and adds potential latency and on the other side makes optimization much more complex.

Aggree.

We have multiple use cases here. Let's look first at the very simplest, which is the storage implementation doing apply. Currently, openraft sends the entries to apply read from the log to the user-specific code, then awaits it/potentially awaits async I/O via callbacks. This works in principle, but it's ineffective.

My suggestion is NOT to pass entries, but rather to only pass the log pointer (committed log ID) to the user and consume the log pointer (applied log ID) from the user. I.e., there are no "commands", but rather there is a simple watch on the committed log ID wrapped suitably. Let the user code deal with reading what is necessary from the log (current APIs are all allocating ones; streaming might help).

It's acceptable to pass a log pointer to the StateMachine instead of log entries. However, pushing commands makes it easier to manage multiple types of operations, such as applying log entries, building a snapshot, or retrieving a snapshot.
Some of these commands may require a strict order. (I'm not quite sure).

Generally speaking, the watch-driven architecture could work, but there are quite a few order-related details to consider.

I'm going to rewrite the part that SM responds apply result to RaftCore with a watcher.

Also, let the user code apply the entries read from the log (most likely from memory) and then just announce applied log ID to openraft via watch-like channel (i.e., updating applied log ID).

Good.

Similarly, for snapshots, the watch on committed log ID could also carry a flag/counter requesting a snapshot. No additional channel/no command channel. When the snapshot is done, again, it is simply announced to the Raft core via a watch-like channel.

With that, on the one hand, the storage implementation has the full flexibility to implement any I/O optimizations, etc., and the communication between Raft core and the storage task is in constant memory.

Good.

The replication tasks are the same in green, at least for append entries.

Good.

Yes, that's a possibility. But, term would be sufficient for "regular" Raft (i.e., CommittedLeaderId for the general case). And, this is already part of LogId, which is read from the log and member of Entry. So no additional information is necessary, it's already there.

The Vote(or term in Raft) is necessary.
I have made a short explanation on it:
https://github.com/datafuselabs/openraft/blob/ee460f376561a3a36bbd11244aa5846b207345d3/openraft/src/raft_state/io_state/log_io_id.rs#L8-L23

OK, as mentioned, we can simply go with log storage as the buffer (as it is today in openraft). Then, we don't need a limited buffer and from the PoV of openraft the buffer is unlimited. It is then up to the application to do admission control somewhere (or openraft needs to add admission control).

Good. This should be feasible when all of the RaftLogStorage methods are
upgraded to callback based.

Where I don't have a good idea yet is how to optimize it with constant memory is the communication from the client, through the raft core task to the log writer. Communication from the client to the raft core task can be done using bounded buffer, but that helps only partially. Writing to the log is still unbounded and can overflow and/or block the raft core task if we'd use a bounded command buffer from raft core to log writer (which is a no-go). The same holds for append entries on the follower.

It is indeed a problem unless RaftLogStorage::append() returns an error indicating the buffer is full, allowing RaftCore to refuse a client write request.

I was thinking about short-cutting this to have only one bounded queue from client write/append entries directly to the log write task and log write task informing the raft core about accepted entries. Since we anyway need to deal with election and truncating the log, the log write task could simply poll both the entry stream from the user (either client write or append entries) and the watch from the raft core task informing it about new election point (via the truncation LogId, which contains a new CommittedLeaderId).

Yes a exclusive single client would be solution.

When the watch returns new leader elected, then the log writing task would simply truncate the log to the log index from the new leader. Depending on the state (we are the leader or not), from now on, only entries received from the correct source (client write or append log) would be accepted, the other ones redirected to leader/discarded.

Again, this concept would allow running in constant memory.

There are still some other things like voting, but those can be also handled in constant memory fairly easily.

Make sense.

@schreter
Copy link
Collaborator Author

It's acceptable to pass a log pointer to the StateMachine instead of log entries. However, pushing commands makes it easier to manage multiple types of operations, such as applying log entries, building a snapshot, or retrieving a snapshot.

Right, pushing command does have it's advantages :-). But, it can likely be handled by a different means.

Some of these commands may require a strict order. (I'm not quite sure).

Well, instead of pushing individual commands, one can send the required state. This has also a huge advantage that it would "cancel" commands which do not make sense to be executed anymore. So in the end, it likely is a win-win scenario.

We just need to ensure watch is implemented optimally for partial state updates (but I can take care of that, we can reuse stuff from our project).

Generally speaking, the watch-driven architecture could work, but there are quite a few order-related details to consider.

Oh yes, I know.

For one, the current architecture requires passing a Responder to send the client reply. This channel is of course not in the log. In our project, it's not a big deal, since the reply to the client is done anyway directly on the state machine level, not sent back via two tasks via openraft (this is correct, since only replies to committed commands are sent and any commands re-executed after restart will send same replies - we have a reply cache).

Possibly, some similar concept could be used here. For example, do not send Entry + Responder, but rather embed the Responder in the Entry, i.e., add Entry::request_completed(self, response: WriteResult), which would do the needful. This can be also mapped to the current Responder, of course. For entries read from the log, this Responder would be either None or an empty one, as the application needs. And, it is the job of the application not to lose the Responder for not yet applied operations.

@schreter
Copy link
Collaborator Author

Ah, forgot to comment on this one:

The Vote (or term in Raft) is necessary.

Yes, but what is a Vote? It is a term or a term + leader ID. Which is exactly what we have in current LogId, isn't it?

@drmingdrmer
Copy link
Member

Ah, forgot to comment on this one:

The Vote (or term in Raft) is necessary.

Yes, but what is a Vote? It is a term or a term + leader ID. Which is exactly what we have in current LogId, isn't it?

The log id has the id of the Leader that proposed this log. Not the one that replicates this log.

A log can be proposed by Leader-1 and replicated by Leader-2.

LogId(term, node-id, index) is not monotonic to RaftLogStorage::append(): the same log id could be appended more than once. Thus LogId can not be used as an IO pointer.

An IO pointer must be monotonic increasing, (LeaderVote, LogId) is monotonic increasing. The LeaderVote in it represents the Leader that writes the log to local storage or replicates the log to a remote node.

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index).
For example a valid IOId could be: WritingLeader(term=3,node_id=2), LogId(term=2,node_id=1,index).
And the IOId.WritingLeader is always greater than or equal IOId.LogId.leader_id.

@schreter
Copy link
Collaborator Author

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index).

OK, thanks.

The problem is, if we push the log to the writer via short circuit, the Vote is unknown. I will have to ponder it a bit later, now the work calls (meetings :-( ).

@schreter
Copy link
Collaborator Author

So, back to the topic...

LogId(term, node-id, index) is not monotonic to RaftLogStorage::append(): the same log id could be appended more than once. Thus LogId can not be used as an IO pointer.

I don't quite get it. Yes, the same index can be appended more than once, with different term, if the leader changes and the log is thus truncated (or in case of the optimization, instead of term it would be CommittedLeaderId which subsumes term + node ID).

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index).

Yes, but how can the writing leader/Vote term differ from the one in LogId, except for lower indices replicated by the leader for log entries committed before the writing leader became the leader? For this case, all these entries with an older term must be already committed, so they will never ever be truncated out again.

We can only truncate that portion of the log, which was written after we became leader (and managed to persist something in the log, which isn't committed by the quorum and will be truncated out when we become follower).

Basically, a truncate request is always a new LogId with a new term and a lower index. From this point on, any committed entries will have this term (which is higher than any term written previously for any index past this point). Even if this new leader appends something and is out-voted later by yet higher term, this still holds.

So I believe it's sufficient to work like this:

  • An append request comes in with a certain log_id.
  • If the log_id is naturally contiguous to the log (i.e., with log_id.index == last_log_id.index + 1 and log_id.term >= last_log_id.term), then accept it as-is and append it.
  • If the log_id.index <= last_log_id.index, then term > last_log_id.term (a new leader has been elected). In this case, truncate the log to this log_id and then append a new entry with this new log_id.
  • In all the other cases refuse the append due to inconsistency (gap in indices, term going down, etc.).

For the optimized leader election, replace term with CommittedLeaderId.

Am I missing something?

@drmingdrmer
Copy link
Member

Let me take an example to explain what could happen to a certain log id:

Scenario: LogId Truncated and Appended Multiple Times

Consider a scenario where a specific LogId is truncated and appended more than once.

The following discussion is in Raft terminology, a term and LogId are represented as (term, index).

Ni: Node
i-j: log with term i and index j

N1 | 1-1 1-2
N2 | 1-1 1-2
N3 | 3-1
N4 | 4-1
N5 |
N6 |
N7 |
-------------------------> log index

Given the initial cluster state built with the following steps:

  1. N1 as Leader:
    • N1 established itself as the leader with a quorum of N1, N5, N6, N7.
    • N1 appended logs 1-1 and 1-2, replicating only to N2.
  2. N3 as Leader:
    • N3 became the leader with a quorum of N3, N5, N6, N7.
    • N3 appended log 3-1 but failed to replicate any logs to other nodes.
  3. N4 as Leader:
    • N4 became the leader with a quorum of N4, N5, N6, N7.
    • N4 appended log 4-1 but also failed to replicate any logs.

As a result, N1's log will be truncated and appended multiple times:

  • Initial State:
    • N1's term and logs are 1; 1-1, 1-2.
  • N3 as Leader at Term 5:
    • N3 established itself as the leader with a quorum of N3, N5, N6, N7.
    • N3 truncated all logs on N1 and replicated log 3-1 to N1 before crashing.
    • N1's term and log become 5; 3-1.
  • N2 as Leader at Term 6:
    • N2 established itself as the leader with a quorum of N2, N5, N6, N7.
    • N2 truncated all logs on N1 and replicated logs 1-1, 1-2 to N1 before crashing.
    • N1's term and logs become 6; 1-1, 1-2.
  • N4 as Leader at Term 7:
    • N4 established itself as the leader with a quorum of N4, N5, N6, N7.
    • N4 truncated all logs on N1 and replicated log 4-1 to N1 before crashing.
    • N1's term and log become 7; 4-1.

This scenario can repeat, where a log entry with the same LogId is truncated and appended multiple times if there are enough nodes in the cluster.

However, it's important to note that a specific LogId can only be truncated and appended by a leader with a higher term.

Therefore, the pointer for an IO operation must be represented as term, LogId(term, index).

@schreter
Copy link
Collaborator Author

Therefore, the pointer for an IO operation must be represented as term, LogId(term, index).

Uff, quite a complex example :-). But I understand what you mean. Nevertheless, it should be fairly easy to implement it as well. Basically, we have two sources feeding entries into the log writer:

  • the leader (from client_write)
  • the append entry receiver (on the follower)

In both cases, at the time the entry would be put to the log buffer the Vote is known (leader term for simplicity). So we can indeed send the entry to the (bounded) log write channel identified as (term, log_id), where the term is the one of the known leader.

The log write task would truncate the log to the log_id upon seeing a higher term and ignore entries with older term. With that, it's still simple enough and can be run in constant memory.

Or am I still missing something?

@drmingdrmer
Copy link
Member

It looks like a working approach. But I can not say for sure for now. It involves too many modification to the codebase.

Can you give some of the top level API signature, such as for Follower to accept AppendEntries.

@schreter
Copy link
Collaborator Author

schreter commented Aug 2, 2024

Sorry for the delay. First, let's look at replication. My suggestion would be on these lines:

trait RaftNetworkFactory<C> {
    /// Create replication stream streaming log entries to a remote node.
    fn new_replication_stream(
        &mut self,
        target: C::NodeId,
        node: C::Node
) -> impl RaftReplicationStream {
    DefaultReplicationStream(self.new_client(target, node));
}

/// State of the replication for a replication stream.
trait RaftReplicationState {
    /// Get the next target log ID to replicate up to, the current commit ID and the current Vote to replicate to the follower.
    async fn next_request(&self) -> Result<(LogId, LogId, Vote), ReplicationClosed>;

    /// Report progress of the replication to the leader.
    fn report_progress(&self, flushed_up_to: LogId) -> Result<(), ReplicationClosed>;
}

/// Replication stream.
trait RaftReplicationStream {
    /// Run the replication loop.
    /// 
    /// Read entries from the `log_reader` and send them to the `target`.
    /// Use `state` object to get the new log ID to replicate to and to report progress.
    async fn replicate(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftReplicationState,
    ) -> Result<(), ConflictOrHigherVoteOrNetworkError>;
}

where DefaultReplicationStream could be used to wrap current append entries calls for compatibility, if needed.

The Network would create RaftReplicationStream, which is then subsequently run in a separate task. It is not openraft's problem how the entries are packetized, the implementation simply calls next_request() to get the target replication state, which internally either reads the current state from watch channel or waits on a watch channel for a new state. The leader state loop simply advances the state stored in the watch channel and optionally sets a timeout (but that could be probably also delegated to the stream itself).

When some entries are replicated and logged remotely, the remote node sends a confirmation in application-specific way, which will result in either reporting progress or by reporting an error (conflict/higher vote). In case of error, there is no way for replication to continue, so the loop ends and the "right" error is returned. Upon returning, the driver of the replication in the replication task would inform the raft core and/or restart the replication as needed.

Reporting the progress would just entail setting a new value for the watch, so the state loop would then update its state appropriately based on state change of the replication.

No memory for actual data channels and no entry copying is involved here (aside from reading from the log, but the application should optimize that).

@schreter
Copy link
Collaborator Author

schreter commented Aug 2, 2024

Now, for RaftStateMachine, we can solve it in a very similar way:

trait RaftStateMachine {
     fn new_apply_stream(&mut self) -> impl RaftApplyStream;
}

/// State for applying committed entries.
trait RaftApplyState: 'static {
    /// Get the next committed log ID to apply up to.
    async fn next_request(&self) -> Result<LogId, RaftStopped>;

    /// Report apply progress.
    fn report_progress(last_applied: LogId) -> Result<(), RaftStopped>;
}

// Entry apply stream.
trait RaftApplyStream: 'static {
    /// Run the apply loop.
    /// 
    /// Read entries from the `log_reader` and apply them to the state machine.
    /// Use `state` object to get the new log ID to apply up to to and to report progress.
    async fn apply(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftApplyState,
    ) -> Result<(), StorageError>;
}

The apply stream is started at Raft startup in a new task (or runs in the storage task interleaved with other processing as needed). It uses the state object to await the next committed log ID on a watch channel and report apply progress to another watch channel, so raft core can update its state.

In case of error, the apply loop terminates and the Raft stops.

Same as for replication, there are no actual data moved between tasks, just the watches are read/set and no extra memory needs to be allocated. It is again not openraft's responsibility to batch entries, it's on application level (and it can optimize that).

Same as for replication, it would be possible to write a default implementation which simply calls legacy apply() on the state machine.

What is different here, we need to find a way to send responses to the client. I didn't think this through yet, how to post response objects needed for reporting replies w/o the use of an extra channel. Probably it will require moving reporting the response to a callback on the Entry or so.

@drmingdrmer
Copy link
Member

trait RaftNetworkFactory<C> {
    /// Create replication stream streaming log entries to a remote node.
    fn new_replication_stream(
        &mut self,
        target: C::NodeId,
        node: C::Node
) -> impl RaftReplicationStream {
    DefaultReplicationStream(self.new_client(target, node));
}

/// State of the replication for a replication stream.
trait RaftReplicationState {
    /// Get the next target log ID to replicate up to, the current commit ID and the current Vote to replicate to the follower.
    async fn next_request(&self) -> Result<(LogId, LogId, Vote), ReplicationClosed>;

    /// Report progress of the replication to the leader.
    fn report_progress(&self, flushed_up_to: LogId) -> Result<(), ReplicationClosed>;
}

/// Replication stream.
trait RaftReplicationStream {
    /// Run the replication loop.
    /// 
    /// Read entries from the `log_reader` and send them to the `target`.
    /// Use `state` object to get the new log ID to replicate to and to report progress.
    async fn replicate(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftReplicationState,
    ) -> Result<(), ConflictOrHigherVoteOrNetworkError>;
}

What about snapshot?
When RaftReplicationStream receives the next_request, part of the log entries may have already purged. In such case RaftLogReader would return an StorageError. The current approach is to switch to snapshot replication and during this period, log entries replication is paused.

IMHO, there should be a wrapper like ReplicationCore in the current codebase handling log/snapshot replication switch.

To me the major changes include two parts: replace channel based communication with watch.
What about the report_progress() implementation? I think it should still be a fixed size channel.

There are a few related detail to be consider. I can not tell if it is totally a working pattern.
But the first thing I'm sure that can be done is:

  • Move heartbeat out of ReplicationCore, to make the replication logic more clear.
  • And then it is able to replace the channel from RaftCore to ReplicationCore with a watch.

The data flow from RaftLogReader to RaftReplicationStream requires further refinement because during streaming log entries the Vote in the storage may have changed and when this happens, the stream must be closed at once.

@drmingdrmer
Copy link
Member

Now, for RaftStateMachine, we can solve it in a very similar way:

trait RaftStateMachine {
     fn new_apply_stream(&mut self) -> impl RaftApplyStream;
}

/// State for applying committed entries.
trait RaftApplyState: 'static {
    /// Get the next committed log ID to apply up to.
    async fn next_request(&self) -> Result<LogId, RaftStopped>;

    /// Report apply progress.
    fn report_progress(last_applied: LogId) -> Result<(), RaftStopped>;
}

// Entry apply stream.
trait RaftApplyStream: 'static {
    /// Run the apply loop.
    /// 
    /// Read entries from the `log_reader` and apply them to the state machine.
    /// Use `state` object to get the new log ID to apply up to to and to report progress.
    async fn apply(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftApplyState,
    ) -> Result<(), StorageError>;
}

One minor issues is that report_progress should respond the result(RaftTypeConfig::R) of applying an entries.

The RaftApplyStream may not be able to be a long term task. There are other command such InstallSnapshot that also requires a mutable reference to the state machine. If RaftApplyStream does not hold a &mut SM, it must hold exclusive lock to the StateMachine, which may not be an optimal approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants