CopyCat is an extensible Java-based implementation of the Raft consensus protocol.
The core of CopyCat is a framework designed to support a variety of protocols and transports. CopyCat provides a simple extensible API that can be used to build a strongly consistent, fault/partition tolerant state machine over any mode of communication. CopyCat's Raft implementation also supports advanced features of the Raft algorithm such as snapshotting and dynamic cluster configuration changes and provides additional optimizations for various scenarios such as failure detection and read-only state queries.
CopyCat is a pluggable framework, providing protocol and endpoint implementations for various frameworks such as Netty and Vert.x.
Note that this version requires Java 8. There is also a Java 7 compatible version
CopyCat is a "protocol agnostic" implementation of the Raft consensus algorithm. It provides a framework for constructing a partition-tolerant replicated state machine over a variety of wire-level protocols. It sounds complicated, but the API is actually quite simple. Here's a quick example.
public class KeyValueStore implements StateMachine {
@Stateful
private Map<String, Object> data = new HashMap<>();
@Command(type = Command.Type.READ)
public Object get(String key) {
return data.get(key);
}
@Command(type = Command.Type.WRITE)
public void set(String key, Object value) {
data.put(key, value);
}
@Command(type = Command.Type.WRITE)
public void delete(String key) {
data.remove(key);
}
}
Here we've created a simple key value store. By deploying this state machine on several
nodes in a cluster, CopyCat will ensure commands (i.e. get
, set
, and delete
) are
applied to the state machine in the order in which they're submitted to the cluster
(log order). Internally, CopyCat uses a replicated log to order and replicate commands,
and it uses leader election to coordinate log replication. When the replicated log grows
too large, CopyCat will take a snapshot of the @Stateful
state machine state and
compact the log.
Log log = new FileLog("key-value.log");
To configure the CopyCat cluster, we simply create a ClusterConfig
.
ClusterConfig cluster = new ClusterConfig();
cluster.setLocalMember("tcp://localhost:8080");
cluster.setRemoteMembers("tcp://localhost:8081", "tcp://localhost:8082");
Note that the cluster configuration identifies a particular protocol, tcp
. These are
the endpoints the nodes within the CopyCat cluster use to communicate with one another.
CopyCat provides a number of different protocol and endpoint
implementations.
Additionally, CopyCat cluster membership is dynamic, and the ClusterConfig
is Observable
.
This means that if the ClusterConfig
is changed while the cluster is running, CopyCat
will pick up the membership change and replicate the cluster configuration in a safe manner.
Now that the cluster has been set up, we simply create a CopyCat
instance, specifying
an endpoint through which the outside world can communicate with the cluster.
CopyCat copycat = new CopyCat("http://localhost:5000", new KeyValueStore(), log, cluster);
copycat.start();
That's it! We've just created a strongly consistent, fault-tolerant key-value store with an HTTP API in less than 25 lines of code!
public class StronglyConsistentFaultTolerantAndTotallyAwesomeKeyValueStore implements StateMachine {
public static void main(String[] args) {
// Create the local file log.
Log log = new FileLog("key-value.log");
// Configure the cluster.
ClusterConfig cluster = new ClusterConfig();
cluster.setLocalMember("tcp://localhost:8080");
cluster.setRemoteMembers("tcp://localhost:8081", "tcp://localhost:8082");
// Create and start a server at localhost:5000.
new CopyCat("http://localhost:5000", new StronglyConsistentFaultTolerantAndTotallyAwesomeKeyValueStore(), log, cluster).start();
}
@Stateful
private Map<String, Object> data = new HashMap<>();
@Command(type = Command.Type.READ)
public Object get(String key) {
return data.get(key);
}
@Command(type = Command.Type.WRITE)
public void set(String key, Object value) {
data.put(key, value);
}
@Command(type = Command.Type.WRITE)
public void delete(String key) {
data.remove(key);
}
}
We can now execute commands on the state machine by making POST
requests to
the HTTP interface.
POST http://localhost:5000/set
["foo", "Hello world!"]
200 OK
POST http://localhost:5000/get
["foo"]
200 OK
{
"result": "Hello world!"
}
POST http://localhost:5000/delete
["foo"]
200 OK
CopyCat doesn't require that commands be submitted via an endpoint. Rather than
submitting commands via the HTTP endpoint, simply construct a CopyCatContext
instance and submit commands directly to the cluster via the context.
CopyCatContext context = new CopyCatContext(new KeyValueStore(), log, cluster);
// Set a key in the key-value store.
context.submitCommand("set", "foo", "Hello world!").thenRun(() -> {
context.submitCommand("get", "foo").whenComplete((result, error) {
context.submitCommand("delete", "foo").thenRun(() -> System.out.println("Deleted 'foo'"));
});
});
CopyCat uses a Raft-based consensus algorithm to perform leader election and state replication. Each node in a CopyCat cluster may be in one of three states at any given time - follower, candidate, or leader. Each node in the cluster maintains an internal log of replicated commands. When a command is submitted to a CopyCat cluster, the command is forwarded to the cluster leader. The leader then logs the command and replicates it to a majority of the cluster. Once the command has been replicated, it applies the command to its local state machine and replies with the command result.
For the most in depth description of how CopyCat works, see In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.
Each node in a CopyCat cluster contains a state machine to which the node applies
commands sent to the cluster. State machines are simply classes that
implement CopyCat's StateMachine
interface, but there are a couple of very important
aspects to note about state machines.
- Given the same commands in the same order, state machines should always arrive at the same state with the same output.
- Following from that rule, each node's state machine should be identical.
Commands are state change instructions that are submitted to the CopyCat cluster
and ultimately applied to the state machine. For instance, in a key-value store,
a command could be something like get
or set
. CopyCat provides unique features
that allow it to optimize handling of certain command types. For instance, read-only
commands which don't contribute to the system state will not be replicated, but commands
that do alter the system state are eventually replicated to all the nodes in the cluster.
CopyCat nodes share state using a replicated log. When a command is submitted to the CopyCat cluster, the command is appended to the leader's log and replicated to other nodes in the cluster. If a node dies and later restarts, the node will rebuild its internal state by reapplying logged commands to its state machine. The log is essential to the operation of the CopyCat cluster in that it both helps maintain consistency of state across nodes and assists in leader election. CopyCat provides both in-memory logs for testing and file-based logs for production.
In order to ensure logs do not grow too large for the disk, CopyCat replicas periodically take and persist snapshots of the state machine state. CopyCat manages snapshots using a method different than is described in the original Raft paper. Rather than persisting snapshots to separate snapshot files and replicating snapshots using an additional RCP method, CopyCat appends snapshots directly to the log. This allows CopyCat to minimize complexity by transfering snapshots as a normal part of log replication.
Normally, when a node crashes and recovers, it restores its state using the latest snapshot and then rebuilds the rest of its state by reapplying committed command entries. But in some cases, replicas can become so far out of sync that the cluster leader has already written a new snapshot to its log. In that case, the leader will replicate its latest snapshot to the recovering node, allowing it to start with the leader's snapshot.
CopyCat replicas communicate with one another through a user-defined cluster configuration. The CopyCat cluster is very flexible, and the protocol underlying the CopyCat cluster is pluggable. Additionally, for cases where cluster configuration may change over time, CopyCat supports runtime cluster configuration changes. As with other state changes, all cluster configuration changes are performed through the cluster leader.
CopyCat clusters use leader election to maintain synchronization of nodes. In CopyCat, all state changes are performed via the leader. In other words, commands that are submitted to the cluster are always forwarded to the leader for processing. This leads to a much simpler implementation of single-copy consistency.
Raft's leader election algorithm relies largely on the log to elect a leader. Each node in the cluster can serve one of three roles at any given time - follower, candidate, and leader - each of which serve a specific role in contributing to elections, replicating logs, and maintaining general consistency of state across the cluster.
When a node is first started, it is initialized as a follower. The follower serves only to listen for synchronization requests from cluster leaders and apply replicated log entries to its state machine. If the follower does not receive a request from the cluster leader for a configurable amount of time, it will transition to a candidate and start a new election.
The candidate role occurs when a replica is announcing its candidacy to become the cluster leader. Candidacy occurs when a follower has not heard from the cluster leader for a configurable amount of time. When a replica becomes a candidate, it requests votes from each other member of the cluster. The voting algorithm is essential to the consistency of CopyCat logs. When a replica receives a vote request, it compares the status of the requesting node's log to its own log and decides whether to vote for the candidate based on how up-to-date the candidates log is. This ensures that only replicas with the most up-to-date logs can become the cluster leader.
Once a replica has won an election, it transitions to the leader role. The leader is the node through which all commands and configuration changes are performed. When a command is submitted to the cluster, the command is forwarded to the leader. Based on the command type - read or write - the leader will then replicate the command to its followers, apply it to its state machine, and return the result. The leader is also responsible for maintaining log consistency during cluster configuration changes.
The copycat-core
project is purely an implementation of the Raft consensus algorithm.
It does not implement any specific transport aside from a local
transport for testing.
Instead, the CopyCat API is designed to allow users to implement the transport layer
using the Protocol
API. CopyCat does, however, provide some core protocol implementations
in the copycat-netty
and copycat-vertx
projects.
CopyCat provides a framework for building fault-tolerant state machines on
the Raft consensus algorithm, but fault-tolerant distributed systems are of no use if they can't
be accessed from the outside world. CopyCat's Endpoint
API facilitates creating user-facing
interfaces (servers) for submitting commands to the CopyCat cluster.
Creating a state machine
To create a state machine in CopyCat, simply implement the StateMachine
interface.
public class MyStateMachine implements StateMachine {
}
The state machine interface is simply an identifier interface. All public methods within a state machine can be called as CopyCat commands. This is important to remember.
Providing command types
When a command is submitted to the CopyCat cluster, the command is first written to a log and replicated to other nodes in the cluster. Once the log entry has been replicated to a majority of the cluster, the command is applied to the leader's state machine and the response is returned.
In many cases, CopyCat can avoid writing any data to the log at all. In particular, when a read-only command is submitted to the cluster, CopyCat does not need to log and replicate that command since it has no effect on the machine state. Instead, CopyCat can simply ensure that the cluster is in sync, apply the command to the state machine, and return the result. However, in order for it to do this it needs to have some additional information about each specific command.
To provide command information to CopyCat, use the @Command
annotation on a command
method.
Each command can have one of three types:
READ
WRITE
READ_WRITE
By default, all commands are of the type READ_WRITE
. While there is no real difference
between the WRITE
and READ_WRITE
types, the READ
type is important in allowing
CopyCat to identify read-only commands. It is important that any command identified as
a READ
command never modify the machine state.
Let's look at an example of a command provider:
public class MyStateMachine implements StateMachine {
private final Map<String, Object> data = new HashMap<>();
@Command(type = Command.Type.READ)
public Object read(String key) {
return data.get(key);
}
@Command(type = Command.Type.WRITE)
public void write(String key, Object value) {
data.put(key, value);
}
}
Taking snapshots
One of the issues with a replicated log is that over time it will only continue to grow.
There are a couple of ways to potentially handle this, and CopyCat uses the snapshotting
method that is recommended by the authors of the Raft algorithm. However, in favor of simplicity,
the CopyCat snapshotting implementation does slightly differ from the one described in the Raft
paper. Rather than storing snapshots in a separate snapshot file, CopyCat stores snapshots as
normal log entries, making it easier to replicate snapshots when replicas fall too far out
of sync. Additionally, CopyCat guarantees that the first entry in any log will always be
a SnapshotEntry
, again helping to ease the process of replicating snapshots to far
out-of-date replicas.
All snapshot serialization, storage, and loading is handled by CopyCat internally. Users need
only annotated stateful fields with the @Stateful
annotation. Once the log grows to a
predetermined size (configurable in CopyCatConfig
), CopyCat will take a snaphsot of the log
and wipe all previous log entries.
public class MyStateMachine implements StateMachine {
@Stateful
private Map<String, Object> data = new HashMap<>();
}
CopyCat exposes a configuration API that allows users to configure how CopyCat behaves
during elections and how it replicates commands. To configure a CopyCat replica, create
a CopyCatConfig
to pass to the CopyCatContext
constructor.
CopyCatConfig config = new CopyCatConfig();
The CopyCatConfig
exposes the following configuration methods:
setElectionTimeout
/withElectionTimeout
- sets the timeout within which a follower must receive anAppendEntries
request from the leader before starting a new election. This timeout is also used to calculate the election timeout during elections. Defaults to2000
millisecondssetHeartbeatInterval
/withHeartbeatInterval
- sets the interval at which the leader will send heartbeats (AppendEntries
requests) to followers. Defaults to500
millisecondssetRequireWriteQuorum
/withRequireWriteQuorum
- sets whether to require a quorum during write operations. It is strongly recommended that this remain enabled for consistency. Defaults totrue
(enabled)setRequireReadQuorum
/withRequireReadQuorum
- sets whether to require a quorum during read operations. Read quorums can optionally be disabled in order to improve performance at the risk of reading stale data. When read quorums are disabled, the leader will immediately respond toREAD
type commands by applying the command to its state machine and returning the result. Defaults totrue
(enabled)setMaxLogSize
/withMaxLogSize
- sets the maximum log size before a snapshot should be taken. As entries are appended to the local log, replicas will monitor the size of the local log to determine whether a snapshot should be taken based on this value. Defaults to32 * 1024^2
setCorrelationStrategy
/withCorrelationStrategy
- CopyCat generates correlation identifiers for each request/response pair that it sends. This helps assist in correlating messages across various protocols. By default, CopyCat usesUuidCorrelationStrategy
- aUUID
based correlation ID generator, but depending on the protocol being used, theMonotonicCorrelationStrategy
which generates monotonically increasing IDs may be safe to use (it's safe with all core CopyCat protocols). Defaults toUuidCorrelationStrategy
setTimerStrategy
/withTimerStrategy
- sets the replica timer strategy. This allows users to control how CopyCat's internal timers work. By default, replicas use aThreadTimerStrategy
which uses the JavaTimer
to schedule delays on a background thread. Users can implement their ownTimerStrategy
in order to, for example, implement an event loop based timer. Timers should return monotonically increasing timer IDs that never repeat. Note also that timers do not need to be multi-threaded since CopyCat only sets a timeout a couple time a second. Defaults toThreadTimerStrategy
Configuring the cluster
When a CopyCat cluster is first started, the cluster configuration must be explicitly provided by the user. However, as the cluster runs, explicit cluster configurations may no longer be required. This is because once a cluster leader has been elected, the leader will replicate its cluster configuration to the rest of the cluster, and the user-defined configuration will be replaced by an internal configuration.
To configure the CopyCat cluster, create a ClusterConfig
.
ClusterConfig cluster = new ClusterConfig();
Each cluster configuration must contain a local member and a set of remote members.
To define the local member, pass the member address to the cluster configuration
constructor or call the setLocalMember
method.
cluster.setLocalMember("tcp://localhost:1234");
Note that the configuration uses URIs to identify nodes. The protocol used by CopyCat is pluggable, so member address formats may differ depending on the protocol you're using.
To set remote cluster members, use the setRemoteMembers
or addRemoteMember
method:
cluster.addRemoteMember("tcp://localhost:1235");
cluster.addRemoteMember("tcp://localhost1236");
Creating a dynamic cluster
The CopyCat cluster configuration is Observable
, and once the local node is elected
leader, it will begin observing the configuration for changes.
Once the local node has been started, simply adding or removing nodes from the observable
ClusterConfig
may cause the replica's configuration to be updated. However,
it's important to remember that as with commands, configuration changes must go through
the cluster leader and be replicated to the rest of the cluster. This allows CopyCat
to ensure that logs remain consistent while nodes are added or removed, but it also
means that cluster configuration changes may not be propagated for some period of
time after the ClusterConfig
is updated.
CopyCat replicas are run via a CopyCatContext
. The CopyCat context is a container
for the replica's Log
, StateMachine
, and ClusterConfig
. To start the replica,
simply call the start()
method.
CopyCatContext context = new CopyCatContext(stateMachine, cluster);
context.start();
Setting the log type
By default, CopyCat uses an in-memory log for simplicity. However, in production
users should use a disk-based log. CopyCat provides two Log
implementations:
MemoryLog
- an in-memoryTreeMap
based logFileLog
- a file-based log
To set the log to be used by a CopyCat replica, simply pass the log instance in
the CopyCatContext
constructor:
CopyCatContext context = new CopyCatContext(new MyStateMachine(), new FileLog("my.log"), cluster);
context.start();
Submitting commands to the cluster
To submit commands to the CopyCat cluster, simply call the submitCommand
method
on any CopyCatContext
.
context.submitCommand("get", "foo").thenAccept((result) -> System.out.println(result));
The CopyCatContext
API is supports an arbitrary number of positional arguments. When
a command is submitted, the context will return a CompletableFuture
which will be
completed once the command result is received. The command will automatically
be forwarded on to the current cluster leader. If the cluster does not have any
currently elected leader (or the node to which the command is submitted
doesn't know of any cluster leader) then the submission will fail.
CopyCat provides a pluggable serialization API that allows users to control how log
entries are serialized to the log. CopyCat provides two serializer implementations, one
using the core Java serialization mechanism, and one Jackson
based serializer (the default). To configure the serializer in use, add a file to the classpath
at META-INF/services/net/kuujo/copycat/Serializer
containing the serializer factory class name:
net.kuujo.copycat.setializer.impl.JacksonSerializerFactory
(default)net.kuujo.copycat.serializer.impl.JavaSerializerFactory
CopyCat locates log serializers via its custom service loader implementation. To provide
a custom serializer to CopyCat, simply add a configuration file to your classpath at
META-INF/services/net/kuujo/copycat/Serializer
containing the SerializerFactory
implementation class name.
public class JacksonSerializerFactory extends SerializerFactory {
@Override
public Serializer createSerializer() {
return new JacksonSerializer();
}
}
The serializer factory should return a Serializer
instance via the createSerializer
method.
This is the basic default serializer used by CopyCat:
public class JacksonSerializer implements Serializer {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] writeValue(Object value) {
try {
return mapper.writeValueAsBytes(value);
} catch (JsonProcessingException e) {
throw new SerializationException(e.getMessage());
}
}
@Override
public <T> T readValue(byte[] bytes, Class<T> type) {
try {
return mapper.readValue(bytes, type);
} catch (IOException e) {
throw new SerializationException(e.getMessage());
}
}
}
CopyCat is an abstract API that can implement the Raft consensus algorithm over
any conceivable protocol. To do this, CopyCat provides a flexible protocol plugin
system. Protocols use special URIs - such as local:foo
or tcp://localhost:5050
-
and CopyCat uses a custom service loader similar to the Java service loader. Using
URIs, a protocol can be constructed and started by CopyCat without the large amounts
of boilerplate code that would otherwise be required.
To define a new protocol, simply create a file in your project's
META-INF/services/net/kuujo/copycat/protocol
directory, naming the file with the
project name. In the file should be a single string indicating the name of the protocol
class. For example:
META-INF/services/net/kuujo/copycat/protocol/http
net.kuujo.copycat.protocol.impl.HttpProtocol
The class name should point to a class that implements the Protocol
interface.
The Protocol
interface provides the following methods:
public interface Protocol {
ProtocolClient createClient();
ProtocolServer createServer();
}
You'll notice that the Protocol
itself doesn't actually do anything. Instead,
it simply provides factories for ProtocolClient
and ProtocolServer
.
The ProtocolServer
interface is implemented by the receiving side of the protocol.
The server's task is quite simple - to call replica callbacks when a message is received.
In order to do so, the ProtocolServer
provides a number of methods for registering
callbacks for each command. Each callback is in the form of an AsyncCallback
instance.
public interface ProtocolServer {
void protocolHandler(ProtocolHandler handler);
void start(AsyncCallback<Void> callback);
void stop(AsyncCallback<Void> callback);
}
Each ProtocolServer
should only ever have a single callback for each method registered
at any given time. When the start
and stop
methods are called, the server should
obviously start and stop servicing requests respectively. It's up to the server to
transform wire-level messages into the appropriate Request
instances.
The ProtocolHandler
to which the server calls implements the same interface as
ProtocolClient
below.
The client's task is equally simple - to send messages to another replica when asked.
In order to do so, the ProtocolClient
implements the other side of the ProtocolServer
callback methods.
public interface ProtocolClient {
CompletableFuture<AppendEntriesResponse> appendEntries(AppendEntriesRequest request);
CompletableFuture<RequestVoteResponse> requestVote(RequestVoteRequest request);
CompletableFuture<SubmitCommandResponse> submitCommand(SubmitCommandRequest request);
}
You may be wondering how you can get URI arguments into your protocol implementation.
CopyCat provides special annotations that can be used to inject specific URI parts
into your protocol. Each URI annotation directly mirrors a method on the Java URI
interface, so users can extract any information necessary out of the protocol URI.
These are the available URI annotations:
@UriScheme
@UriSchemeSpecificPart
@UriUserInfo
@UriHost
@UriPort
@UriAuthority
@UriPath
@UriQuery
@UriQueryParam
@UriFragment
Each of these annotations mirrors a method on the URI
interface except for the
last one, @UriQueryParam
. The @UriQueryParam
annotation is a special annotation
for referencing parsed named query arguments.
URI arguments are injected into protocol instances via bean properties or annotated fields. Let's take a look at an example of setter injection:
public class HttpProtocol implements Protocol {
private String host;
@UriHost
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return host;
}
}
By default, if a bean property is not annotated with any URI annotation, the URI injector will attempt to inject a named query parameter into the setter.
public class HttpProtocol implements Protocol {
private String host;
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return host;
}
}
Obviously URI parameters are limiting in type. The CopyCat URI injector supports
complex types via a Registry
instance. Registry lookups can be performed by
prefixing @UriQueryParam
names with the $
prefix.
Registry registry = new BasicRegistry();
registry.bind("http_client", new HttpClient("localhost", 8080));
String uri = "http://copycat?client=$http_client";
The URI above can be used to inject an HttpClient
into the HttpProtocol
via
the Registry
.
public class HttpProtocol implements Protocol {
private HttpClient client;
public void setClient(HttpClient client) {
this.client = client;
}
public HttpClient getClient() {
return client;
}
}
The URI injector can also inject annotated fields directly.
public class HttpProtocol implements Protocol {
@UriHost private String host;
@UriPort private int port;
}
The CopyCatContext is automatically injected into any Protocol
implementation.
When URI injection occurs, the URI injector searches fields for a CopyCatContext
type field and automatically injects the context into that field.
public class HttpProtocol implements Protocol {
private CopyCatContext context;
}
Now that we have all that out of the way, here's the complete Protocol
implementation:
public class HttpProtocol implements Protocol {
private String host;
private int port;
private String path;
public HttpProtocol() {
}
@UriHost
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return host;
}
@UriPort
public void setPort(int port) {
this.port = port;
}
@UriPath
public void setPath(String path) {
this.path = path;
}
public String getPath() {
return path;
}
@Override
public ProtocolClient createClient() {
return new HttpProtocolClient(new HttpClient(host, port), path);
}
@Override
public ProtocolServer createServer() {
return new HttpProtocolServer(new HttpServer(host, port), path);
}
}
Let's take a look at an example of how to configure the CopyCat cluster when using custom protocols.
ClusterConfig cluster = new ClusterConfig("http://localhost:8080/copycat");
cluster.addRemoteMember("http://localhost:8081/copycat");
cluster.addRemoteMember("http://localhost:8082/copycat");
Note that CopyCat does not particularly care about the protocol of any given node in the cluster. Theoretically, different nodes could be connected together by any protocol they want (though I can't imagine why one would want to do such a thing). For the local replica, the protocol's server is used to receive messages. For remote replicas, each protocol instance's client is used to send messages to those replicas.
CopyCat maintains several built-in protocols, some of which are implemented on top of asynchronous frameworks like Netty and Vert.x.
The local
protocol is a simple protocol that communicates between contexts using
direct method calls. This protocol is intended purely for testing.
Registry registry = new ConcurrentRegistry();
ClusterConfig cluster = new ClusterConfig("local:foo");
cluster.setRemoteMembers("local:bar", "local:baz");
CopyCatContext context = new CopyCatContext(new MyStateMachine, cluster, registry);
Note that you should use a ConcurrentRegistry
when using the local
protocol.
The netty tcp
protocol communicates between replicas using Netty TCP channels.
In order to use Netty protocols, you must add the copycat-netty
project as a
dependency. Once the copycat-netty
library is available on your classpath,
CopyCat will automatically find the Netty tcp
protocol.
ClusterConfig cluster = new ClusterConfig("tcp://localhost:1234");
The Vert.x eventbus
protocol communicates between replicas on the Vert.x event
bus. The event bus can either be created in a new Vertx
instance or referenced
in an existing Vertx
instance.
In order to use Vert.x protocols, you must add the copycat-vertx
project as a dependency.
ClusterConfig cluster = new ClusterConfig("eventbus://localhost:1234/foo");
// or...
ClusterConfig cluster = new ClusterConfig("eventbus://foo?vertx=$vertx");
Registry registry = new BasicRegistry();
registry.bind("vertx", vertx);
The Vert.x tcp
protocol communicates between replicas using a simple wire protocol
over Vert.x NetClient
and NetServer
instances. To configure the tcp
protocol,
simply use an ordinary TCP address.
In order to use Vert.x protocols, you must add the copycat-vertx
project as a dependency.
ClusterConfig cluster = new ClusterConfig("tcp://localhost:1234");
cluster.setRemoteMembers("tcp://localhost:1235", "tcp://localhost:1236");
We've gone through developing custom protocols for communicating between nodes in CopyCat, but a replicated state machine isn't much good if you can't get much information into it. Nodes need some sort of API that can be exposed to the outside world. For this, CopyCat provides an endpoints API that behaves very similarly to the protocol API.
To register an endpoint, simply add a file to the META-INF/services/net/kuujo/copycat/endpoints
directory, using the endpoint name as the file name. The file should contain a string referencing
a class that implements the Endpoint
interface.
The Endpoint
interface is very simple:
public interface Endpoint {
void start(AsyncCallback<Void> callback);
void stop(AsyncCallback<Void> callback);
}
Endpoints simply wrap the CopyCatContext
and forward requests to the local
context via the CopyCatContext.submitCommand
method.
Endpoints support all the same URI annotations as do protocols. See the protocol documentation on URI annotations for a tutorial on injecting arguments into custom endpoints.
CopyCat provides a simple helper class for wrapping a CopyCatContext in an endpoint. To wrap a context, use the
CopyCatclass. The
CopyCat` constructor
simply accepts a single additional argument which is the endpoint URI.
ClusterConfig cluster = new ClusterConfig("tcp://localhost:5555", "tcp://localhost:5556", "tcp://localhost:5557");
CopyCat copycat = new CopyCat("http://localhost:8080", new MyStateMachine(), cluster)
copycat.start();
CopyCat also provides a couple of built-in endpoints via the copycat-vertx
project. In order to use Vert.x endpoints, you must add the copycat-vertx
project as a dependency.
The Vert.x event bus endpoint receives submissions over the Vert.x event bus.
CopyCat copycat = new CopyCat("eventbus://localhost:8080/foo", new MyStateMachine(), cluster);
// or...
Registry registry = new BasicRegistry();
registry.bind("vertx", vertx);
CopyCat copycat = new CopyCat("eventbus://foo?vertx=$vertx", new MyStateMachine(), cluster, registry);
The Vert.x TCP endpoint uses a delimited JSON-based protocol:
CopyCat copycat = new CopyCat("tcp://localhost:8080", new MyStateMachine(), cluster);
The Vert.x HTTP endpoint uses a simple HTTP server which accepts JSON POST
requests
to the command path. For instance, to submit the read
command with {"key":"foo"}
execute a POST
request to the /read
path using a JSON body containing command arguments.
CopyCat copycat = new CopyCat("http://localhost:8080", new MyStateMachine(), cluster);