8000 GitHub - grze/copycat: Protocol-agnostic implementation of the Raft consensus algorithm
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
10000 / copycat Public
forked from atomix/atomix

Protocol-agnostic implementation of the Raft consensus algorithm

License

Notifications You must be signed in to change notification settings

grze/copycat

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CopyCat

CopyCat is an extensible Java-based implementation of the Raft consensus protocol.

The core of CopyCat is a framework designed to support a variety of protocols and transports. CopyCat provides a simple extensible API that can be used to build a strongly consistent, fault/partition tolerant state machine over any mode of communication. CopyCat's Raft implementation also supports advanced features of the Raft algorithm such as snapshotting and dynamic cluster configuration changes and provides additional optimizations for various scenarios such as failure detection and read-only state queries.

CopyCat is a pluggable framework, providing protocol and endpoint implementations for various frameworks such as Netty and Vert.x.

Note that this version requires Java 8. There is also a Java 7 compatible version

User Manual

  1. A Brief Introduction
  2. How it works
  3. Getting started
  4. Serialization
  5. Protocols
  6. Endpoints

A brief introduction

CopyCat is a "protocol agnostic" implementation of the Raft consensus algorithm. It provides a framework for constructing a partition-tolerant replicated state machine over a variety of wire-level protocols. It sounds complicated, but the API is actually quite simple. Here's a quick example.

public class KeyValueStore implements StateMachine {
  @Stateful
  private Map<String, Object> data = new HashMap<>();

  @Command(type = Command.Type.READ)
  public Object get(String key) {
    return data.get(key);
  }

  @Command(type = Command.Type.WRITE)
  public void set(String key, Object value) {
    data.put(key, value);
  }

  @Command(type = Command.Type.WRITE)
  public void delete(String key) {
    data.remove(key);
  }

}

Here we've created a simple key value store. By deploying this state machine on several nodes in a cluster, CopyCat will ensure commands (i.e. get, set, and delete) are applied to the state machine in the order in which they're submitted to the cluster (log order). Internally, CopyCat uses a replicated log to order and replicate commands, and it uses leader election to coordinate log replication. When the replicated log grows too large, CopyCat will take a snapshot of the @Stateful state machine state and compact the log.

Log log = new FileLog("key-value.log");

To configure the CopyCat cluster, we simply create a ClusterConfig.

ClusterConfig cluster = new ClusterConfig();
cluster.setLocalMember("tcp://localhost:8080");
cluster.setRemoteMembers("tcp://localhost:8081", "tcp://localhost:8082");

Note that the cluster configuration identifies a particular protocol, tcp. These are the endpoints the nodes within the CopyCat cluster use to communicate with one another. CopyCat provides a number of different protocol and endpoint implementations.

Additionally, CopyCat cluster membership is dynamic, and the ClusterConfig is Observable. This means that if the ClusterConfig is changed while the cluster is running, CopyCat will pick up the membership change and replicate the cluster configuration in a safe manner.

Now that the cluster has been set up, we simply create a CopyCat instance, specifying an endpoint through which the outside world can communicate with the cluster.

CopyCat copycat = new CopyCat("http://localhost:5000", new KeyValueStore(), log, cluster);
copycat.start();

That's it! We've just created a strongly consistent, fault-tolerant key-value store with an HTTP API in less than 25 lines of code!

public class StronglyConsistentFaultTolerantAndTotallyAwesomeKeyValueStore implements StateMachine {

  public static void main(String[] args) {
    // Create the local file log.
    Log log = new FileLog("key-value.log");

    // Configure the cluster.
    ClusterConfig cluster = new ClusterConfig();
    cluster.setLocalMember("tcp://localhost:8080");
    cluster.setRemoteMembers("tcp://localhost:8081", "tcp://localhost:8082");

    // Create and start a server at localhost:5000.
    new CopyCat("http://localhost:5000", new StronglyConsistentFaultTolerantAndTotallyAwesomeKeyValueStore(), log, cluster).start();
  }

  @Stateful
  private Map<String, Object> data = new HashMap<>();

  @Command(type = Command.Type.READ)
  public Object get(String key) {
    return data.get(key);
  }

  @Command(type = Command.Type.WRITE)
  public void set(String key, Object value) {
    data.put(key, value);
  }

  @Command(type = Command.Type.WRITE)
  public void delete(String key) {
    data.remove(key);
  }

}

We can now execute commands on the state machine by making POST requests to the HTTP interface.

POST http://localhost:5000/set
["foo", "Hello world!"]

200 OK

POST http://localhost:5000/get
["foo"]

200 OK

{
  "result": "Hello world!"
}

POST http://localhost:5000/delete
["foo"]

200 OK

CopyCat doesn't require that commands be submitted via an endpoint. Rather than submitting commands via the HTTP endpoint, simply construct a CopyCatContext instance and submit commands directly to the cluster via the context.

CopyCatContext context = new CopyCatContext(new KeyValueStore(), log, cluster);

// Set a key in the key-value store.
context.submitCommand("set", "foo", "Hello world!").thenRun(() -> {
  context.submitCommand("get", "foo").whenComplete((result, error) {
    context.submitCommand("delete", "foo").thenRun(() -> System.out.println("Deleted 'foo'"));
  });
});

How it works

CopyCat uses a Raft-based consensus algorithm to perform leader election and state replication. Each node in a CopyCat cluster may be in one of three states at any given time - follower, candidate, or leader. Each node in the cluster maintains an internal log of replicated commands. When a command is submitted to a CopyCat cluster, the command is forwarded to the cluster leader. The leader then logs the command and replicates it to a majority of the cluster. Once the command has been replicated, it applies the command to its local state machine and replies with the command result.

For the most in depth description of how CopyCat works, see In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.

State Machines

Each node in a CopyCat cluster contains a state machine to which the node applies commands sent to the cluster. State machines are simply classes that implement CopyCat's StateMachine interface, but there are a couple of very important aspects to note about state machines.

  • Given the same commands in the same order, state machines should always arrive at the same state with the same output.
  • Following from that rule, each node's state machine should be identical.

Commands

Commands are state change instructions that are submitted to the CopyCat cluster and ultimately applied to the state machine. For instance, in a key-value store, a command could be something like get or set. CopyCat provides unique features that allow it to optimize handling of certain command types. For instance, read-only commands which don't contribute to the system state will not be replicated, but commands that do alter the system state are eventually replicated to all the nodes in the cluster.

Logs

CopyCat nodes share state using a replicated log. When a command is submitted to the CopyCat cluster, the command is appended to the leader's log and replicated to other nodes in the cluster. If a node dies and later restarts, the node will rebuild its internal state by reapplying logged commands to its state machine. The log is essential to the operation of the CopyCat cluster in that it both helps maintain consistency of state across nodes and assists in leader election. CopyCat provides both in-memory logs for testing and file-based logs for production.

Snapshots

In order to ensure logs do not grow too large for the disk, CopyCat replicas periodically take and persist snapshots of the state machine state. CopyCat manages snapshots using a method different than is described in the original Raft paper. Rather than persisting snapshots to separate snapshot files and replicating snapshots using an additional RCP method, CopyCat appends snapshots directly to the log. This allows CopyCat to minimize complexity by transfering snapshots as a normal part of log replication.

Normally, when a node crashes and recovers, it restores its state using the latest snapshot and then rebuilds the rest of its state by reapplying committed command entries. But in some cases, replicas can become so far out of sync that the cluster leader has already written a new snapshot to its log. In that case, the leader will replicate its latest snapshot to the recovering node, allowing it to start with the leader's snapshot.

Cluster Configurations

CopyCat replicas communicate with one another through a user-defined cluster configuration. The CopyCat cluster is very flexible, and the protocol underlying the CopyCat cluster is pluggable. Additionally, for cases where cluster configuration may change over time, CopyCat supports runtime cluster configuration changes. As with other state changes, all cluster configuration changes are performed through the cluster leader.

Leader Election

CopyCat clusters use leader election to maintain synchronization of nodes. In CopyCat, all state changes are performed via the leader. In other words, commands that are submitted to the cluster are always forwarded to the leader for processing. This leads to a much simpler implementation of single-copy consistency.

Raft's leader election algorithm relies largely on the log to elect a leader. Each node in the cluster can serve one of three roles at any given time - follower, candidate, and leader - each of which serve a specific role in contributing to elections, replicating logs, and maintaining general consistency of state across the cluster.

Followers

When a node is first started, it is initialized as a follower. The follower serves only to listen for synchronization requests from cluster leaders and apply replicated log entries to its state machine. If the follower does not receive a request from the cluster leader for a configurable amount of time, it will transition to a candidate and start a new election.

Candidates

The candidate role occurs when a replica is announcing its candidacy to become the cluster leader. Candidacy occurs when a follower has not heard from the cluster leader for a configurable amount of time. When a replica becomes a candidate, it requests votes from each other member of the cluster. The voting algorithm is essential to the consistency of CopyCat logs. When a replica receives a vote request, it compares the status of the requesting node's log to its own log and decides whether to vote for the candidate based on how up-to-date the candidates log is. This ensures that only replicas with the most up-to-date logs can become the cluster leader.

Leaders

Once a replica has won an election, it transitions to the leader role. The leader is the node through which all commands and configuration changes are performed. When a command is submitted to the cluster, the command is forwarded to the leader. Based on the command type - read or write - the leader will then replicate the command to its followers, apply it to its state machine, and return the result. The leader is also responsible for maintaining log consistency during cluster configuration changes.

Protocols

The copycat-core project is purely an implementation of the Raft consensus algorithm. It does not implement any specific transport aside from a local transport for testing. Instead, the CopyCat API is designed to allow users to implement the transport layer using the Protocol API. CopyCat does, however, provide some core protocol implementations in the copycat-netty and copycat-vertx projects.

Endpoints

CopyCat provides a framework for building fault-tolerant state machines on the Raft consensus algorithm, but fault-tolerant distributed systems are of no use if they can't be accessed from the outside world. CopyCat's Endpoint API facilitates creating user-facing interfaces (servers) for submitting commands to the CopyCat cluster.

Getting Started

Creating a state machine

To create a state machine in CopyCat, simply implement the StateMachine interface.

public class MyStateMachine implements StateMachine {
}

The state machine interface is simply an identifier interface. All public methods within a state machine can be called as CopyCat commands. This is important to remember.

Providing command types

When a command is submitted to the CopyCat cluster, the command is first written to a log and replicated to other nodes in the cluster. Once the log entry has been replicated to a majority of the cluster, the command is applied to the leader's state machine and the response is returned.

In many cases, CopyCat can avoid writing any data to the log at all. In particular, when a read-only command is submitted to the cluster, CopyCat does not need to log and replicate that command since it has no effect on the machine state. Instead, CopyCat can simply ensure that the cluster is in sync, apply the command to the state machine, and return the result. However, in order for it to do this it needs to have some additional information about each specific command.

To provide command information to CopyCat, use the @Command annotation on a command method.

Each command can have one of three types:

  • READ
  • WRITE
  • READ_WRITE

By default, all commands are of the type READ_WRITE. While there is no real difference between the WRITE and READ_WRITE types, the READ type is important in allowing CopyCat to identify read-only commands. It is important that any command identified as a READ command never modify the machine state.

Let's look at an example of a command provider:

public class MyStateMachine implements StateMachine {
  private final Map<String, Object> data = new HashMap<>();

  @Command(type = Command.Type.READ)
  public Object read(String key) {
    return data.get(key);
  }

  @Command(type = Command.Type.WRITE)
  public void write(String key, Object value) {
    data.put(key, value);
  }

}

Taking snapshots

One of the issues with a replicated log is that over time it will only continue to grow. There are a couple of ways to potentially handle this, and CopyCat uses the snapshotting method that is recommended by the authors of the Raft algorithm. However, in favor of simplicity, the CopyCat snapshotting implementation does slightly differ from the one described in the Raft paper. Rather than storing snapshots in a separate snapshot file, CopyCat stores snapshots as normal log entries, making it easier to replicate snapshots when replicas fall too far out of sync. Additionally, CopyCat guarantees that the first entry in any log will always be a SnapshotEntry, again helping to ease the process of replicating snapshots to far out-of-date replicas.

All snapshot serialization, storage, and loading is handled by CopyCat internally. Users need only annotated stateful fields with the @Stateful annotation. Once the log grows to a predetermined size (configurable in CopyCatConfig), CopyCat will take a snaphsot of the log and wipe all previous log entries.

public class MyStateMachine implements StateMachine {
  @Stateful
  private Map<String, Object> data = new HashMap<>();
}

Configuring the replica

CopyCat exposes a configuration API that allows users to configure how CopyCat behaves during elections and how it replicates commands. To configure a CopyCat replica, create a CopyCatConfig to pass to the CopyCatContext constructor.

CopyCatConfig config = new CopyCatConfig();

The CopyCatConfig exposes the following configuration methods:

  • setElectionTimeout/withElectionTimeout - sets the timeout within which a follower must receive an AppendEntries request from the leader before starting a new election. This timeout is also used to calculate the election timeout during elections. Defaults to 2000 milliseconds
  • setHeartbeatInterval/withHeartbeatInterval - sets the interval at which the leader will send heartbeats (AppendEntries requests) to followers. Defaults to 500 milliseconds
  • setRequireWriteQuorum/withRequireWriteQuorum - sets whether to require a quorum during write operations. It is strongly recommended that this remain enabled for consistency. Defaults to true (enabled)
  • setRequireReadQuorum/withRequireReadQuorum - sets whether to require a quorum during read operations. Read quorums can optionally be disabled in order to improve performance at the risk of reading stale data. When read quorums are disabled, the leader will immediately respond to READ type commands by applying the command to its state machine and returning the result. Defaults to true (enabled)
  • setMaxLogSize/withMaxLogSize - sets the maximum log size before a snapshot should be taken. As entries are appended to the local log, replicas will monitor the size of the local log to determine whether a snapshot should be taken based on this value. Defaults to 32 * 1024^2
  • setCorrelationStrategy/withCorrelationStrategy - CopyCat generates correlation identifiers for each request/response pair that it sends. This helps assist in correlating messages across various protocols. By default, CopyCat uses UuidCorrelationStrategy - a UUID based correlation ID generator, but depending on the protocol being used, the MonotonicCorrelationStrategy which generates monotonically increasing IDs may be safe to use (it's safe with all core CopyCat protocols). Defaults to UuidCorrelationStrategy
  • setTimerStrategy/withTimerStrategy - sets the replica timer strategy. This allows users to control how CopyCat's internal timers work. By default, replicas use a ThreadTimerStrategy which uses the Java Timer to schedule delays on a background thread. Users can implement their own TimerStrategy in order to, for example, implement an event loop based timer. Timers should return monotonically increasing timer IDs that never repeat. Note also that timers do not need to be multi-threaded since CopyCat only sets a timeout a couple time a second. Defaults to ThreadTimerStrategy

Configuring the cluster

When a CopyCat cluster is first started, the cluster configuration must be explicitly provided by the user. However, as the cluster runs, explicit cluster configurations may no longer be required. This is because once a cluster leader has been elected, the leader will replicate its cluster configuration to the rest of the cluster, and the user-defined configuration will be replaced by an internal configuration.

To configure the CopyCat cluster, create a ClusterConfig.

ClusterConfig cluster = new ClusterConfig();

Each cluster configuration must contain a local member and a set of remote members. To define the local member, pass the member address to the cluster configuration constructor or call the setLocalMember method.

cluster.setLocalMember("tcp://localhost:1234");

Note that the configuration uses URIs to identify nodes. The protocol used by CopyCat is pluggable, so member address formats may differ depending on the protocol you're using.

To set remote cluster members, use the setRemoteMembers or addRemoteMember method:

cluster.addRemoteMember("tcp://localhost:1235");
cluster.addRemoteMember("tcp://localhost1236");

Creating a dynamic cluster

The CopyCat cluster configuration is Observable, and once the local node is elected leader, it will begin observing the configuration for changes.

Once the local node has been started, simply adding or removing nodes from the observable ClusterConfig may cause the replica's configuration to be updated. However, it's important to remember that as with commands, configuration changes must go through the cluster leader and be replicated to the rest of the cluster. This allows CopyCat to ensure that logs remain consistent while nodes are added or removed, but it also means that cluster configuration changes may not be propagated for some period of time after the ClusterConfig is updated.

Creating the CopyCatContext

CopyCat replicas are run via a CopyCatContext. The CopyCat context is a container for the replica's Log, StateMachine, and ClusterConfig. To start the replica, simply call the start() method.

CopyCatContext context = new CopyCatContext(stateMachine, cluster);
context.start();

Setting the log type

By default, CopyCat uses an in-memory log for simplicity. However, in production users should use a disk-based log. CopyCat provides two Log implementations:

  • MemoryLog - an in-memory TreeMap based log
  • FileLog - a file-based log

To set the log to be used by a CopyCat replica, simply pass the log instance in the CopyCatContext constructor:

CopyCatContext context = new CopyCatContext(new MyStateMachine(), new FileLog("my.log"), cluster);
context.start();

Submitting commands to the cluster

To submit commands to the CopyCat cluster, simply call the submitCommand method on any CopyCatContext.

context.submitCommand("get", "foo").thenAccept((result) -> System.out.println(result));

The CopyCatContext API is supports an arbitrary number of positional arguments. When a command is submitted, the context will return a CompletableFuture which will be completed once the command result is received. The command will automatically be forwarded on to the current cluster leader. If the cluster does not have any currently elected leader (or the node to which the command is submitted doesn't know of any cluster leader) then the submission will fail.

Serialization

CopyCat provides a pluggable serialization API that allows users to control how log entries are serialized to the log. CopyCat provides two serializer implementations, one using the core Java serialization mechanism, and one Jackson based serializer (the default). To configure the serializer in use, add a file to the classpath at META-INF/services/net/kuujo/copycat/Serializer containing the serializer factory class name:

  • net.kuujo.copycat.setializer.impl.JacksonSerializerFactory (default)
  • net.kuujo.copycat.serializer.impl.JavaSerializerFactory

Providing custom log serializers

CopyCat locates log serializers via its custom service loader implementation. To provide a custom serializer to CopyCat, simply add a configuration file to your classpath at META-INF/services/net/kuujo/copycat/Serializer containing the SerializerFactory implementation class name.

public class JacksonSerializerFactory extends SerializerFactory {

  @Override
  public Serializer createSerializer() {
    return new JacksonSerializer();
  }

}

The serializer factory should return a Serializer instance via the createSerializer method. This is the basic default serializer used by CopyCat:

public class JacksonSerializer implements Serializer {
  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public byte[] writeValue(Object value) {
    try {
      return mapper.writeValueAsBytes(value);
    } catch (JsonProcessingException e) {
      throw new SerializationException(e.getMessage());
    }
  }

  @Override
  public <T> T readValue(byte[] bytes, Class<T> type) {
    try {
      return mapper.readValue(bytes, type);
    } catch (IOException e) {
      throw new SerializationException(e.getMessage());
    }
  }

}

Protocols

CopyCat is an abstract API that can implement the Raft consensus algorithm over any conceivable protocol. To do this, CopyCat provides a flexible protocol plugin system. Protocols use special URIs - such as local:foo or tcp://localhost:5050 - and CopyCat uses a custom service loader similar to the Java service loader. Using URIs, a protocol can be constructed and started by CopyCat without the large amounts of boilerplate code that would otherwise be required.

To define a new protocol, simply create a file in your project's META-INF/services/net/kuujo/copycat/protocol directory, naming the file with the project name. In the file should be a single string indicating the name of the protocol class. For example:

META-INF/services/net/kuujo/copycat/protocol/http

net.kuujo.copycat.protocol.impl.HttpProtocol

The class name should point to a class that implements the Protocol interface. The Protocol interface provides the following methods:

public interface Protocol {

  ProtocolClient createClient();

  ProtocolServer createServer();

}

You'll notice that the Protocol itself doesn't actually do anything. Instead, it simply provides factories for ProtocolClient and ProtocolServer.

Writing a protocol server

The ProtocolServer interface is implemented by the receiving side of the protocol. The server's task is quite simple - to call replica callbacks when a message is received. In order to do so, the ProtocolServer provides a number of methods for registering callbacks for each command. Each callback is in the form of an AsyncCallback instance.

public interface ProtocolServer {

  void protocolHandler(ProtocolHandler handler);

  void start(AsyncCallback<Void> callback);

  void stop(AsyncCallback<Void> callback);

}

Each ProtocolServer should only ever have a single callback for each method registered at any given time. When the start and stop methods are called, the server should obviously start and stop servicing requests respectively. It's up to the server to transform wire-level messages into the appropriate Request instances.

The ProtocolHandler to which the server calls implements the same interface as ProtocolClient below.

Writing a protocol client

The client's task is equally simple - to send messages to another replica when asked. In order to do so, the ProtocolClient implements the other side of the ProtocolServer callback methods.

public interface ProtocolClient {

  CompletableFuture<AppendEntriesResponse> appendEntries(AppendEntriesRequest request);

  CompletableFuture<RequestVoteResponse> requestVote(RequestVoteRequest request);

  CompletableFuture<SubmitCommandResponse> submitCommand(SubmitCommandRequest request);

}

Injecting URI arguments into a protocol

You may be wondering how you can get URI arguments into your protocol implementation. CopyCat provides special annotations that can be used to inject specific URI parts into your protocol. Each URI annotation directly mirrors a method on the Java URI interface, so users can extract any information necessary out of the protocol URI.

These are the available URI annotations:

  • @UriScheme
  • @UriSchemeSpecificPart
  • @UriUserInfo
  • @UriHost
  • @UriPort
  • @UriAuthority
  • @UriPath
  • @UriQuery
  • @UriQueryParam
  • @UriFragment

Each of these annotations mirrors a method on the URI interface except for the last one, @UriQueryParam. The @UriQueryParam annotation is a special annotation for referencing parsed named query arguments.

URI arguments are injected into protocol instances via bean properties or annotated fields. Let's take a look at an example of setter injection:

public class HttpProtocol implements Protocol {
  private String host;

  @UriHost
  public void setHost(String host) {
    this.host = host;
  }

  public String getHost() {
    return host;
  }

}

By default, if a bean property is not annotated with any URI annotation, the URI injector will attempt to inject a named query parameter into the setter.

public class HttpProtocol implements Protocol {
  private String host;

  public void setHost(String host) {
    this.host = host;
  }

  public String getHost() {
    return host;
  }

}

Obviously URI parameters are limiting in type. The CopyCat URI injector supports complex types via a Registry instance. Registry lookups can be performed by prefixing @UriQueryParam names with the $ prefix.

Registry registry = new BasicRegistry();
registry.bind("http_client", new HttpClient("localhost", 8080));
String uri = "http://copycat?client=$http_client";

The URI above can be used to inject an HttpClient into the HttpProtocol via the Registry.

public class HttpProtocol implements Protocol {
  private HttpClient client;

  public void setClient(HttpClient client) {
    this.client = client;
  }

  public HttpClient getClient() {
    return client;
  }

}

The URI injector can also inject annotated fields directly.

public class HttpProtocol implements Protocol {
  @UriHost private String host;
  @UriPort private int port;
}

Injecting the CopyCatContext

The CopyCatContext is automatically injected into any Protocol implementation. When URI injection occurs, the URI injector searches fields for a CopyCatContext type field and automatically injects the context into that field.

public class HttpProtocol implements Protocol {
  private CopyCatContext context;
}

The complete protocol

Now that we have all that out of the way, here's the complete Protocol implementation:

public class HttpProtocol implements Protocol {
  private String host;
  private int port;
  private String path;

  public HttpProtocol() {
  }

  @UriHost
  public void setHost(String host) {
    this.host = host;
  }

  public String getHost() {
    return host;
  }

  @UriPort
  public void setPort(int port) {
    this.port = port;
  }

  @UriPath
  public void setPath(String path) {
    this.path = path;
  }

  public String getPath() {
    return path;
  }

  @Override
  public ProtocolClient createClient() {
    return new HttpProtocolClient(new HttpClient(host, port), path);
  }

  @Override
  public ProtocolServer createServer() {
    return new HttpProtocolServer(new HttpServer(host, port), path);
  }

}

Configuring the cluster with custom protocols

Let's take a look at an example of how to configure the CopyCat cluster when using custom protocols.

ClusterConfig cluster = new ClusterConfig("http://localhost:8080/copycat");
cluster.addRemoteMember("http://localhost:8081/copycat");
cluster.addRemoteMember("http://localhost:8082/copycat");

Note that CopyCat does not particularly care about the protocol of any given node in the cluster. Theoretically, different nodes could be connected together by any protocol they want (though I can't imagine why one would want to do such a thing). For the local replica, the protocol's server is used to receive messages. For remote replicas, each protocol instance's client is used to send messages to those replicas.

Built-in protocols

CopyCat maintains several built-in protocols, some of which are implemented on top of asynchronous frameworks like Netty and Vert.x.

Local Protocol

The local protocol is a simple protocol that communicates between contexts using direct method calls. This protocol is intended purely for testing.

Registry registry = new ConcurrentRegistry();
ClusterConfig cluster = new ClusterConfig("local:foo");
cluster.setRemoteMembers("local:bar", "local:baz");
CopyCatContext context = new CopyCatContext(new MyStateMachine, cluster, registry);

Note that you should use a ConcurrentRegistry when using the local protocol.

Netty TCP Protocol

The netty tcp protocol communicates between replicas using Netty TCP channels.

In order to use Netty protocols, you must add the copycat-netty project as a dependency. Once the copycat-netty library is available on your classpath, CopyCat will automatically find the Netty tcp protocol.

ClusterConfig cluster = new ClusterConfig("tcp://localhost:1234");

Vert.x Event Bus Protocol

The Vert.x eventbus protocol communicates between replicas on the Vert.x event bus. The event bus can either be created in a new Vertx instance or referenced in an existing Vertx instance.

In order to use Vert.x protocols, you must add the copycat-vertx project as a dependency.

ClusterConfig cluster = new ClusterConfig("eventbus://localhost:1234/foo");

// or...

ClusterConfig cluster = new ClusterConfig("eventbus://foo?vertx=$vertx");
Registry registry = new BasicRegistry();
registry.bind("vertx", vertx);

Vert.x TCP Protocol

The Vert.x tcp protocol communicates between replicas using a simple wire protocol over Vert.x NetClient and NetServer instances. To configure the tcp protocol, simply use an ordinary TCP address.

In order to use Vert.x protocols, you must add the copycat-vertx project as a dependency.

ClusterConfig cluster = new ClusterConfig("tcp://localhost:1234");
cluster.setRemoteMembers("tcp://localhost:1235", "tcp://localhost:1236");

Endpoints

We've gone through developing custom protocols for communicating between nodes in CopyCat, but a replicated state machine isn't much good if you can't get much information into it. Nodes need some sort of API that can be exposed to the outside world. For this, CopyCat provides an endpoints API that behaves very similarly to the protocol API.

To register an endpoint, simply add a file to the META-INF/services/net/kuujo/copycat/endpoints directory, using the endpoint name as the file name. The file should contain a string referencing a class that implements the Endpoint interface.

The Endpoint interface is very simple:

public interface Endpoint {

  void start(AsyncCallback<Void> callback);

  void stop(AsyncCallback<Void> callback);

}

Endpoints simply wrap the CopyCatContext and forward requests to the local context via the CopyCatContext.submitCommand method.

Using URI annotations with endpoints

Endpoints support all the same URI annotations as do protocols. See the protocol documentation on URI annotations for a tutorial on injecting arguments into custom endpoints.

Wrapping the CopyCatContext in an endpoint service

CopyCat provides a simple helper class for wrapping a CopyCatContext in an endpoint. To wrap a context, use the CopyCatclass. TheCopyCat` constructor simply accepts a single additional argument which is the endpoint URI.

ClusterConfig cluster = new ClusterConfig("tcp://localhost:5555", "tcp://localhost:5556", "tcp://localhost:5557");
CopyCat copycat = new CopyCat("http://localhost:8080", new MyStateMachine(), cluster)
copycat.start();

Built-in endpoints

CopyCat also provides a couple of built-in endpoints via the copycat-vertx project. In order to use Vert.x endpoints, you must add the copycat-vertx project as a dependency.

Vert.x Event Bus Endpoint

The Vert.x event bus endpoint receives submissions over the Vert.x event bus.

CopyCat copycat = new CopyCat("eventbus://localhost:8080/foo", new MyStateMachine(), cluster);

// or...

Registry registry = new BasicRegistry();
registry.bind("vertx", vertx);
CopyCat copycat = new CopyCat("eventbus://foo?vertx=$vertx", new MyStateMachine(), cluster, registry);

Vert.x TCP Endpoint

The Vert.x TCP endpoint uses a delimited JSON-based protocol:

CopyCat copycat = new CopyCat("tcp://localhost:8080", new MyStateMachine(), cluster);

Vert.x HTTP Endpoint

The Vert.x HTTP endpoint uses a simple HTTP server which accepts JSON POST requests to the command path. For instance, to submit the read command with {"key":"foo"} execute a POST request to the /read path using a JSON body containing command arguments.

CopyCat copycat = new CopyCat("http://localhost:8080", new MyStateMachine(), cluster);

About

Protocol-agnostic implementation of the Raft consensus algorithm

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published
0