8000 Corfu Objects · CorfuDB/CorfuDB Wiki · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Corfu Objects

dahliamalkhi edited this page Dec 2, 2016 · 9 revisions

Conventional Java objects are turned into Corfu objects via CorfuCompileProxy. This class wraps an in-memory object with methods that back it up by the Corfu stream. The object itself resides in the field underlyingObject.

public class CorfuCompileProxy<T> implements ICorfuSMRProxyInternal<T> {

    /** The underlying object. This object stores the actual
    state as well as the version of the object. It also
    provides locks to access the object safely from a
    multi-threaded context. */
   @Getter
   VersionLockedObject<T> underlyingObject;

Four basic functionalities are supported, an accessor helper, a mutator helper, an accessor-mutator, and a transaction helper. These functionalities manipulate the in-memory object by sending updates to the Corfu stream and synchronizing with it.

This class implements ICorfuSMRProxy (to be precise, it implements ICorfuSMRProxyInternal which extends ICorfuSMRProxy). The key SMRProxy methods are precisely the above basic functionalities:

/** An interface for accessing a proxy, which
 * manages an SMR object.
 * Created by mwei on 11/10/16.
 */
public interface ICorfuSMRProxy<T> {

/** Access the state of the object.
 * @param accessMethod      The method to execute when accessing an object.
 * @param <R>               The type to return.
 * @return                  The result of the accessMethod
 */
<R> R access(ICorfuSMRAccess<R,T> accessMethod);

/**
 * Record an SMR function to the log before returning.
 * @param smrUpdateFunction     The name of the function to record.
 * @param args                  The arguments to the function.
 *
 * @return  The address in the log the SMR function was recorded at.
 */
long logUpdate(String smrUpdateFunction, Object... args);

/** Run in a transactional context.
 *
 * @param txFunction    The function to run in a transactional context.
 * @param <R>           The return type.
 * @return              The value supplied by the function.
 */
<R> R TXExecute(Supplier<R> txFunction);

A key field of CorfuCompileProxy is the following:

/** A map containing upcall targets. This takes a SMRMethod string
 * and converts it to an upcall function which transitions an object
 * to a new state given arguments.
 */
@Getter
Map<String, ICorfuSMRUpcallTarget<T>> upcallTargetMap;

In order for this mapping of string-names to object-methods to work, an application developer needs to annotate Mutator and AccessorMutator methods of Corfu objects with string names, as in:

@CorfuObject
class MyCorfuObject {

  @Mutator(name="Set")
  void Set(ValType newValue) { ... }

(For a tutorial on programming new Corfu objects, see Custom-Made Corfu Objects.)

logUpdate method implementation

The goal of logUpdate is to capture an object mutation as a stream entry and append it to the object's stream. Recall that Corfu object methods are identified by their string-name.

/**
 * Record an SMR function to the log before returning.
 *
 * @param smrUpdateFunction The name of the function to record.
 * @param args              The arguments to the function.
 * @return The address in the log the SMR function was recorded at.
 */
@Override
public long logUpdate(boolean isTX, String smrUpdateFunction, Object... args) {

If we neglect transactions for a moment, then logUpdate simply obtains a token from the sequencer writes an SMR entry representing the requested method and its arguments. This is done asynchronously by invoking the acquireAndWrite method of StreamView:

    // If we aren't in a transaction, we can just write the modification.
    // We need to add the acquired token into the pending upcall list.
    SMREntry smrEntry = new SMREntry(smrUpdateFunction, args, serializer);
    long address = underlyingObject.getStreamViewUnsafe().acquireAndWrite(smrEntry, t -> {
            pendingUpcalls.add(t.getToken());
            return true;
        }, t -> {
            pendingUpcalls.remove(t.getToken());
            return true;
        });

Inside transactional context, Mutator and AccessorMutator method invocations are not appended to the log until commit time. They are instead accumulated in a TransactionalContext object. Therefore, inside transactional context, we invoke the TransactionContext logUpdate method to do the work:

    // If we aren't coming from a transactional context,
    // redirect us to a transactional context first.
    if (TransactionalContext.isInTransaction()) {
        // We generate an entry to avoid exposing the serializer to the tx context.
        SMREntry entry = new SMREntry(smrUpdateFunction, args, serializer);
        return TransactionalContext.getCurrentContext()
                .logUpdate(this, entry);
    }

access method implementation

The goal of the access method is to synchronize the in-memory underlyingObject with the Corfu stream at the specified "timestamp", which is a stream-offset.

As above, the method behaves differently within a transactional context and outside one. Within a transaction, it invokes a TransactionContext access method. That access method, in addition to applying the underlying object's accessor, also enforces the transaction isolation level; for example, OptimisticTransactionContext records the object in a read-set.

public <R> R access(boolean isTX, ICorfuSMRAccess<R, T> accessMethod) {
    // If we aren't coming from a transactional context,
    // redirect us to a transactional context first.
    if (!isTX) {
        if (TransactionalContext.isInTransaction()) {
            return TransactionalContext.getCurrentContext()
                    .access(this, accessMethod);
        }
    }

Outside transcational context (i.e., in a normal access), access captures the current log tail position from the sequencer. Then it tries to apply the underlying object's accessor method on an in-memory copy of the object synchronized to that position, or a later one. The basic functionality is as follows:

    // Linearize this read against a timestamp
    final long timestamp =
            rt.getSequencerView()
            .nextToken(Collections.singleton(streamID), 0).getToken();

    // Acquire locks and perform read.
    return underlyingObject.optimisticallyReadThenReadLockThenWriteOnFail(
         (ver,o) -> {
        // check if the version is at least that of the linearized read requested.
        if (ver >= timestamp)
            return accessMethod.access(underlyingObject.getObjectUnsafe());
        // We don't have the right version, so we need to write
        // throwing this exception causes us to take a write lock.
        throw new ConcurrentModificationException();
      },

      // this part is executed if we did not have the right version
      (ver, o) -> {  
            syncObjectUnsafe(underlyingObject, timestamp);
            return accessMethod.access(underlyingObject.getObjectUnsafe());
      }
   );

Things are complicated further by transactional context. (way too complex.)

TXExecute method implementation

The goal of TXExecute is to invoke a method inside a transaction in order to provide ACID guarantees for the entire method. The implementation wraps the method with TXBegin() and TXEnd() stream functions. The body of the transaction is executed asynchronously, and on abort, the transaction is automatically retried. The entire TXExecute method the follows:

public <R> R TXExecute(Supplier<R> txFunction) {
    while (true) {
        try {
            rt.getObjectsView().TXBegin();
            R ret = txFunction.get();
            rt.getObjectsView().TXEnd();
            return ret;
        } catch (Exception e) {
            log.warn("Transactional function aborted due to {}, retrying", e);
            try {Thread.sleep(1000); }
            catch (Exception ex) {}
        }
    }
}
Clone this wiki locally
0