study from GrokkingStreamingSystems , and implementing in go way.
For reading the code, the first recommended version is v0.4.
- Previous versions had unreasonable package divisions (v0.1, v0.2).
- Had some practically useless code that was not removed in the refactoring process(v0.3).
- v0.4 actually still has some unreasonable places, but it's readability should be even better than the original version, GrokkingStreamingSystems
- support custom define job, as long as you implement your own source and operator executors.
- suport run parallelly
- support round-robin strategy and filed grouping strategy
- support stream-graph
- still processing each element individally time (i.e. no window function)
- at-most-once(i.e. no guarantees of being successfully processed)
Design idea is very Straightforward. Operator and Source are both Component in Stream. You can see it in pkg/engine/types.go
Based on above, Operator interface requires to implement Apply(Event, EventCollector) error
, This function needs to be implemented by users.
Similarly, Source interface requires to implement GetEvents(string, EventCollector)
. The first argument actually is event, just encode as string, and will put the event into EventCollector to dispatch downstream operator.
The stream consists of only these two components —— the source and the operator. Component implements the interface and is inherited by source.Source and operator.Operator, which implement the necessary methods for reuse by all operators. On top of that, the user only needs to care about how their business logic is implemented.
After defining the operator, the question to discuss is how streamwork will run the operator, which involves the executor.The executor design is similar to the above, with a component executor that will be inherited by the source executor and operator executor.
But there are two points different from the component
1. User does not need to care about this detail, The user only needs to care about how Apply(Event, EventCollector) error
and GetEvents(string, EventCollector)
are implemented.
2. Because of the need to support parallelism, so there may be multiple instance executors to handle event separately.
- Process interface defines how executors will run
- event collector, event dispatcher and event queue are helper data structures to transport event through the stream. As we needs to support stream-graph, so one event may be dispatched to multiple operators and multiple event may merged in one operator, we need some mechanism to support it.
- TODO.?
two steps
- define your own operator and source
- uses your operator and source to create a new job and start it
Take vehicle count job as example, just inherit source.Source and operator.Operator, and add whatever data structer you need to support your logic in Apply(Event, EventCollector) error
or GetEvents(string, EventCollector)
.
type SensorReader struct {
source.Source
}
type VehicleCounter struct {
operator.Operator
counter map[carType]int
}
Take a look at the below example, you can find more information in pkg/jobs/vehicle_count_job/job_test.go, just use ApplyOperator to connect operator to each others.
vehicleJob := job.NewJob("vehicle count base test")
brigdeStream, err := vehicleJob.AddSource(NewSensorReader("sensor-reader"))
if err != nil {
panic(err)
}
brigdeStream.ApplyOperator(NewVehicleCounter("vehicle counter"))
once you start it, you can open another terminal and use nc localhost 9990
to connect to streamwork
starter := job.NewJobStarter(vehicleJob)
starter.Start()
- git clone git@github.com:jensenojs/streamwork.git
- install go and necessary dependencies
go run pkg/main.go
- once you start it, you can open another terminal and use
nc localhost 9990
to connect to streamwork
ps : if you want to debug the job in package jobs, make sure you use debug mode so the output can be shown in debug console.