-
Notifications
You must be signed in to change notification settings - Fork 123
SMRObject
The class CorfuSMRObjectProxy
is responsible for the Corfu replication magic. It turns a plain Java object of type P
into a replicated object, where the history of updates to P is transparently backed by a Corfu stream.
public class CorfuSMRObjectProxy<P> extends CorfuObjectProxy<P>
The are fundamentally two functionalities:
- Apply updates from the stream to the in-memory object state,
- Generate an update by appending it to the stream.
Respectively, these are carried by the sync
method and the writeUpdate
method.
sync
is responsible for synchronizing the in-memory object state with the current state of the stream. It reads the stream up to maxpos
and applies updates one by one.
public synchronized void sync(P obj, long maxPos) {
try (LockUtils.AutoCloseRWLock writeLock = new LockUtils.AutoCloseRWLock(rwLock).writeLock()) {
LogData[] entries = sv.readTo(maxPos);
Arrays.stream(entries)
.forEach(m -> applyUpdate(m.getGlobalAddress(), (LogEntry) m.getPayload(runtime), obj));
}
applyUpdate
takes an entry from a stream and reads its content as an array of SMR method invocations, which are invoked one by one.
boolean applyUpdate(long address, LogEntry entry, P obj) {
if (entry instanceof ISMRConsumable)
{
((ISMRConsumable) entry).getSMRUpdates(getStreamID())
.stream()
.forEach(x -> applySMRUpdate(address, x, obj));
}
The actual invocation is done inside applySMRUpdates
, which relies on being able to find a method to invoke which has the corresponding name. If there is a completableFuture associated with the stream-address that has been applied, it is invoked post the invocation. Finally, if there is a postHandler
registered with this object, it is invoked after both the method application is done, and the completableFuture instantiated.
boolean applySMRUpdate(long address, SMREntry entry, P obj) {
// Find the method by using the method name hash table.
Method m = ... ;
Object ret = m.invoke(obj, entry.getSMRArguments());
if (completableFutureMap.containsKey(address)) {
completableFutureMap.get(address).complete(ret);
}
if (postHandler != null)
postHandler.handle(entry.getSMRMethod(), entry.getSMRArguments(), obj);
...
}
One subtle issue to note is that during m.invoke()
, the class signals that the in-memory object is being updating by setting methodAccessMode
to true; it unsets it right after the invocation.