-
Notifications
You must be signed in to change notification settings - Fork 671
MF-1079 - Add MQTT forwarder #1164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1164 +/- ##
=======================================
Coverage 74.03% 74.03%
=======================================
Files 102 102
Lines 6883 6883
=======================================
Hits 5096 5096
Misses 1433 1433
Partials 354 354 Continue to review full report at Codecov.
|
327bf48
to
ccbfd7c
Compare
bfb762b
to
3126070
Compare
messaging/mqtt/subscriber.go
Outdated
package mqtt | ||
|
||
import ( | ||
"errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not "github.com/mainflux/mainflux/errors"
?
cmd/mqtt/main.go
Outdated
@@ -130,6 +136,22 @@ func main() { | |||
|
|||
cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) | |||
|
|||
ps, err := nats.NewPubSub(cfg.natsURL, "mqtt", logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use natsPubSub
here instead of ps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe nps
can be a good compromise? Also mqttPub
can become mp
and pub
below can become np
.
In any case, using long or short names - we should be consistent - which is not currently the case.
cmd/mqtt/main.go
Outdated
logger.Error(fmt.Sprintf("Failed to create MQTT publisher: %s", err)) | ||
os.Exit(1) | ||
} | ||
if err := mqtt.Forward(nats.SubjectAllChannels, ps, mqttPub); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we configure nats.SubjectAllChannels
from pubsub?
mqtt/forwarder.go
Outdated
if msg.Protocol == protocol { | ||
return nil | ||
} | ||
topic := fmt.Sprintf("%s.%s.%s.%s", channels, msg.Channel, messages, msg.Subtopic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is topic
ok if Subtopic
is empty?
cmd/mqtt/main.go
Outdated
@@ -130,6 +136,22 @@ func main() { | |||
|
|||
cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) | |||
|
|||
ps, err := nats.NewPubSub(cfg.natsURL, "mqtt", logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe nps
can be a good compromise? Also mqttPub
can become mp
and pub
below can become np
.
In any case, using long or short names - we should be consistent - which is not currently the case.
messaging/mqtt/pubsub.go
Outdated
SetClientID(id). | ||
SetCleanSession(false) | ||
client := mqtt.NewClient(opts) | ||
tkn := client.Connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does tkn
stand for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It stands for token
. That's the construct Paho internally returns on the async operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not token
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is bit confusing IMHO, maybe comment could help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drasko why a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not token
than?
messaging/mqtt/subscriber.go
Outdated
import ( | 8000||
"errors" | ||
"fmt" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kill this white line
dd75b21
to
be96166
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
func (pub publisher) Publish(topic string, msg messaging.Message) error { | ||
token := pub.client.Publish(topic, qos, false, msg.Payload) | ||
ok := token.WaitTimeout(pub.timeout) | ||
if ok && token.Error() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if token.Error() != nil
in the previous line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont you like this if token.Wait() && token.Error() != nil {
better ?
mqtt/forwarder.go
Outdated
// Use concatenation instead of mft.Sprintf for the | ||
// sake of simplicity and performance. | ||
topic := channels + "." + msg.Channel + "." + messages | ||
if msg.Subtopic != "" { | ||
topic += "." + msg.Subtopic | ||
} | ||
topic = strings.ReplaceAll(topic, ".", "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not simply use /
?
Also, use fmt.Sprint() to concatenate strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to /
. String buffer would be an overkill here, and the comment explains why I don't want to use Sprintf
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, and I think we should consider avoiding sprintf
in general where +
string concatenation can bring simplicity. We need to sync internally and understand when to use one and when another.
token := sub.client.Subscribe(topic, qos, sub.mqttHandler(handler)) | ||
ok := token.WaitTimeout(sub.timeout) | ||
if ok && token.Error() != nil { | ||
return token.Error() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we return token.Error()
on multiple places it would be good to wrap with another error so that exact place can be more easily detected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave error wrapping for the layer above MQTT Publisher or Subscriber. We only use it as a wrapper around MQTT client, so returning the real error is sufficient, IMHO. However, I'll improve the message of the errors we return if token is expired to be more specific ("failed to subscribe/unsubscribe due to timeout reached" is better than just "failed to subscribe/unsubscribe").
f75d21a
to
fe27966
Compare
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
cmd/mqtt/main.go
Outdated
@@ -16,6 +16,7 @@ import ( | |||
"github.com/mainflux/mainflux" | |||
"github.com/mainflux/mainflux/logger" | |||
"github.com/mainflux/mainflux/messaging" | |||
mqttpub "github.com/mainflux/mainflux/messaging/mqtt" | |||
"github.com/mainflux/mainflux/messaging/nats" | |||
mqtt "github.com/mainflux/mainflux/mqtt" | |||
mr "github.com/mainflux/mainflux/mqtt/redis" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use mqttRedis
here.
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Add MQTT forwarder Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Cleanup forwarder code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use MQTT Publisher in MQTT forwarder Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Cleanup MQTT messaging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add Paho client timeout errors Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Simplify MQTT fowarder Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix naming in main method Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use interface and struct instead of function Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Mainflux errors package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename `tkn` to `token` Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use "/" instead of "." as topic separator Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use async MQTT Publisher Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix timeout errors messages Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add connect token check Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename package alias Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
What does this do?
This pull request introduces MQTT forwarder for multi-protocol support explained in #1079.
Which issue(s) does this PR fix/relate to?
This pull request fixes #1079.
List any changes that modify/break current functionality
Messages published from other protocols will be received on MQTT subs.
Have you included tests for your changes?
No.
Did you document any new/modified functionality?
This fix will make the MQTT adapter work as intended.