8000 SMRObject · CorfuDB/CorfuDB Wiki · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

SMRObject

dahliamalkhi edited this page Dec 2, 2016 · 1 revision

The class CorfuSMRObjectProxy is responsible for the Corfu replication magic. It turns a plain Java object of type P into a replicated object, where the history of updates to P is transparently backed by a Corfu stream.

    public class CorfuSMRObjectProxy<P> extends CorfuObjectProxy<P> 

The are fundamentally two functionalities:

  1. Apply updates from the stream to the in-memory object state,
  2. Generate an update by appending it to the stream.

Respectively, these are carried by the sync method and the writeUpdate method.

sync functionality

sync is responsible for synchronizing the in-memory object state with the current state of the stream. It reads the stream up to maxpos and applies updates one by one.

public synchronized void sync(P obj, long maxPos) {
    try (LockUtils.AutoCloseRWLock writeLock = new LockUtils.AutoCloseRWLock(rwLock).writeLock()) {
        LogData[] entries = sv.readTo(maxPos);
        Arrays.stream(entries)
                .forEach(m -> applyUpdate(m.getGlobalAddress(), (LogEntry) m.getPayload(runtime), obj));
    }

applyUpdate takes an entry from a stream and reads its content as an array of SMR method invocations, which are invoked one by one.

boolean applyUpdate(long address, LogEntry entry, P obj) {
    if (entry instanceof ISMRConsumable)
    {
        ((ISMRConsumable) entry).getSMRUpdates(getStreamID())
                .stream()
                .forEach(x -> applySMRUpdate(address, x, obj));
    }

The actual invocation is done inside applySMRUpdates, which relies on being able to find a method to invoke which has the corresponding name. If there is a completableFuture associated with the stream-address that has been applied, it is invoked post the invocation. Finally, if there is a postHandler registered with this object, it is invoked after both the method application is done, and the completableFuture instantiated.

boolean applySMRUpdate(long address, SMREntry entry, P obj) {
        // Find the method by using the method name hash table.
        Method m = ... ;
        Object ret = m.invoke(obj, entry.getSMRArguments());

        if (completableFutureMap.containsKey(address)) {
            completableFutureMap.get(address).complete(ret);
        }
        if (postHandler != null) 
            postHandler.handle(entry.getSMRMethod(), entry.getSMRArguments(), obj);
        ...
}

One subtle issue to note is that during m.invoke(), the class signals that the in-memory object is being updating by setting methodAccessMode to true; it unsets it right after the invocation.

writeUpdate functionality

Clone this wiki locally
0