This is a simple implementation of an event bus in golang. Actually support follwing pattern:
- publish/subscribe
- request/response
To start use eventbus in your project, you can run the following command.
go get github.com/stanipetrosyan/go-eventbus
And import
import (
goeventbus "github.com/stanipetrosyan/go-eventbus"
)
Simple example of publish/subscribe pattern.
eventbus = goeventbus.NewEventBus()
address := "topic"
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()
eventbus.Channel(address).Subscriber().Listen(func(dc goeventbus.Context) {
fmt.Printf("Message %s\n", dc.Result().Data)
})
eventbus.Channel(address).Publisher().Publish(message)
Simple example of request/response pattern.
eventbus = goeventbus.NewEventBus()
address := "topic"
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").Build()
eventbus.Channel(address).Subscriber().Listen(func(context goeventbus.Context) {
fmt.Printf("Message %s\n", context.Result().Extract())
context.Reply("Hello from subscriber")
})
eventbus.Channel(address).Publisher().Request(message, func(context goeventbus.Context) {
fmt.Printf("Message %s\n", context.Result().Extract())
})
For publishing, you need to create a Message object using this method.
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()
Each message can have some options:
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
message := goeventbus.NewMessageBuilder().setHeaders(options<
7060
/span>).Build()
eventBus.Channel("address").Publisher().Publish(message)
A processor works like a middleware, in fact forwards messages only if context Next method is called. The entity works as Subscriber: Listen method accept a callback with context.
The processor intercept message from publisher to subscribers on a specific channel.
eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
if context.Result().ExtractHeaders().Contains("header") {
context.Next()
}
})
Inside processor is possible change message to forward using Map
method:
eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
if context.Result().ExtractHeaders().Contains("header") {
newMessage := NewMessageBuilder().SetPayload("new message").Build()
context.Map(newMessage).Next()
}
})
A Network bus create a tcp connection between different services.
NetworkBus is a wrapper of local eventbus.
A simple server/client example is in examples/networkbus
directory.