-
Notifications
You must be signed in to change notification settings - Fork 123
Corfu Objects
Conventional Java objects are turned into Corfu objects via CorfuCompileProxy
.
This class wraps an in-memory object with methods that back it up by the Corfu stream. The object itself resides in the field underlyingObject
.
public class CorfuCompileProxy<T> implements ICorfuSMRProxyInternal<T> {
/** The underlying object. This object stores the actual
state as well as the version of the object. It also
provides locks to access the object safely from a
multi-threaded context. */
@Getter
VersionLockedObject<T> underlyingObject;
Four basic functionalities are supported, an accessor helper, a mutator helper, an accessor-mutator, and a transaction helper. These functionalities manipulate the in-memory object by sending updates to the Corfu stream and synchronizing with it.
This class implements ICorfuSMRProxy
(to be precise, it implements ICorfuSMRProxyInternal
which extends ICorfuSMRProxy
). The key SMRProxy methods are precisely the above basic functionalities:
/** An interface for accessing a proxy, which
* manages an SMR object.
* Created by mwei on 11/10/16.
*/
public interface ICorfuSMRProxy<T> {
/** Access the state of the object.
* @param accessMethod The method to execute when accessing an object.
* @param <R> The type to return.
* @return The result of the accessMethod
*/
<R> R access(ICorfuSMRAccess<R,T> accessMethod);
/**
* Record an SMR function to the log before returning.
* @param smrUpdateFunction The name of the function to record.
* @param args The arguments to the function.
*
* @return The address in the log the SMR function was recorded at.
*/
long logUpdate(String smrUpdateFunction, Object... args);
/** Run in a transactional context.
*
* @param txFunction The function to run in a transactional context.
* @param <R> The return type.
* @return The value supplied by the function.
*/
<R> R TXExecute(Supplier<R> txFunction);
A key field of CorfuCompileProxy is the following:
/** A map containing upcall targets. This takes a SMRMethod string
* and converts it to an upcall function which transitions an object
* to a new state given arguments.
*/
@Getter
Map<String, ICorfuSMRUpcallTarget<T>> upcallTargetMap;
In order for this mapping of string-names to object-methods to work, an application developer needs to annotate Mutator and AccessorMutator methods of Corfu objects with string names, as in:
@CorfuObject
class MyCorfuObject {
@Mutator(name="Set")
void Set(ValType newValue) { ... }
(For a tutorial on programming new Corfu objects, see Custom-Made Corfu Objects.)
The goal of logUpdate
is to capture an object mutation as a stream entry and append it to the object's stream. Recall that Corfu object methods are identified by their string-name.
/**
* Record an SMR function to the log before returning.
*
* @param smrUpdateFunction The name of the function to record.
* @param args The arguments to the function.
* @return The address in the log the SMR function was recorded at.
*/
@Override
public long logUpdate(boolean isTX, String smrUpdateFunction, Object... args) {
If we neglect transactions for a moment, then logUpdate
simply obtains a token from the sequencer writes an SMR entry representing the requested method and its arguments. This is done asynchronously by invoking the acquireAndWrite
method of StreamView
:
// If we aren't in a transaction, we can just write the modification.
// We need to add the acquired token into the pending upcall list.
SMREntry smrEntry = new SMREntry(smrUpdateFunction, args, serializer);
long address = underlyingObject.getStreamViewUnsafe().acquireAndWrite(smrEntry, t -> {
pendingUpcalls.add(t.getToken());
return true;
}, t -> {
pendingUpcalls.remove(t.getToken());
return true;
});
Inside transactional context, Mutator and AccessorMutator method invocations are not appended to the log until commit time. They are instead accumulated in a TransactionalContext object. Therefore, inside transactional context, we invoke the TransactionContext logUpdate
method to do the work:
// If we aren't coming from a transactional context,
// redirect us to a transactional context first.
if (TransactionalContext.isInTransaction()) {
// We generate an entry to avoid exposing the serializer to the tx context.
SMREntry entry = new SMREntry(smrUpdateFunction, args, serializer);
return TransactionalContext.getCurrentContext()
.logUpdate(this, entry);
}
The goal of the access
method is to synchronize the in-memory underlyingObject
with the Corfu stream at the specified "timestamp", which is a stream-offset.
As above, the method behaves differently within a transactional context and outside one. Within a transaction, it invokes a TransactionContext access
method. That access method, in addition to applying the underlying object's accessor, also enforces the transaction isolation level; for example, OptimisticTransactionContext records the object in a read-set.
public <R> R access(boolean isTX, ICorfuSMRAccess<R, T> accessMethod) {
// If we aren't coming from a transactional context,
// redirect us to a transactional context first.
if (!isTX) {
if (TransactionalContext.isInTransaction()) {
return TransactionalContext.getCurrentContext()
.access(this, accessMethod);
}
}
Outside transcational context (i.e., in a normal access), access
captures the current log tail position from the sequencer. Then it tries to apply the underlying object's accessor method on an in-memory copy of the object synchronized to that position, or a later one. The basic functionality is as follows:
// Linearize this read against a timestamp
final long timestamp =
rt.getSequencerView()
.nextToken(Collections.singleton(streamID), 0).getToken();
// Acquire locks and perform read.
return underlyingObject.optimisticallyReadThenReadLockThenWriteOnFail(
(ver,o) -> {
// check if the version is at least that of the linearized read requested.
if (ver >= timestamp)
return accessMethod.access(underlyingObject.getObjectUnsafe());
// We don't have the right version, so we need to write
// throwing this exception causes us to take a write lock.
throw new ConcurrentModificationException();
},
// this part is executed if we did not have the right version
(ver, o) -> {
syncObjectUnsafe(underlyingObject, timestamp);
return accessMethod.access(underlyingObject.getObjectUnsafe());
}
);
Things are complicated further by transactional context. (way too complex.)
The goal of TXExecute
is to invoke a method inside a transaction in order to provide ACID guarantees for the entire method. The implementation wraps the method with TXBegin()
and TXEnd()
stream functions. The body of the transaction is executed asynchronously, and on abort, the transaction is automatically retried. The entire TXExecute
method the follows:
public <R> R TXExecute(Supplier<R> txFunction) {
while (true) {
try {
rt.getObjectsView().TXBegin();
R ret = txFunction.get();
rt.getObjectsView().TXEnd();
return ret;
} catch (Exception e) {
log.warn("Transactional function aborted due to {}, retrying", e);
try {Thread.sleep(1000); }
catch (Exception ex) {}
}
}
}