8000 MF-1079 - Add MQTT forwarder by dborovcanin · Pull Request #1164 · absmach/supermq · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

MF-1079 - Add MQTT forwarder #1164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 7, 2020
Merged

Conversation

dborovcanin
Copy link
Collaborator
@dborovcanin dborovcanin commented May 5, 2020

What does this do?

This pull request introduces MQTT forwarder for multi-protocol support explained in #1079.

Which issue(s) does this PR fix/relate to?

This pull request fixes #1079.

List any changes that modify/break current functionality

Messages published from other protocols will be received on MQTT subs.

Have you included tests for your changes?

No.

Did you document any new/modified functionality?

This fix will make the MQTT adapter work as intended.

@dborovcanin dborovcanin requested a review from a team as a code owner May 5, 2020 09:35
@codecov-io
Copy link
codecov-io commented May 5, 2020

Codecov Report

Merging #1164 into master will not change coverage.
The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##           master    #1164   +/-   ##
=======================================
  Coverage   74.03%   74.03%           
=======================================
  Files         102      102           
  Lines        6883     6883           
=======================================
  Hits         5096     5096           
  Misses       1433     1433           
  Partials      354      354           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f9432c6...fe27966. Read the comment docs.

@dborovcanin dborovcanin changed the base branch from relay to master May 5, 2020 16:30
@dborovcanin dborovcanin changed the title NOISSUE - Add MQTT forwarder MF-1079 - Add MQTT forwarder May 5, 2020
package mqtt

import (
"errors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not "github.com/mainflux/mainflux/errors" ?

cmd/mqtt/main.go Outdated
@@ -130,6 +136,22 @@ func main() {

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

ps, err := nats.NewPubSub(cfg.natsURL, "mqtt", logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use natsPubSub here instead of ps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe nps can be a good compromise? Also mqttPub can become mp and pub below can become np.

In any case, using long or short names - we should be consistent - which is not currently the case.

cmd/mqtt/main.go Outdated
logger.Error(fmt.Sprintf("Failed to create MQTT publisher: %s", err))
os.Exit(1)
}
if err := mqtt.Forward(nats.SubjectAllChannels, ps, mqttPub); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we configure nats.SubjectAllChannels from pubsub?

if msg.Protocol == protocol {
return nil
}
topic := fmt.Sprintf("%s.%s.%s.%s", channels, msg.Channel, messages, msg.Subtopic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is topic ok if Subtopic is empty?

cmd/mqtt/main.go Outdated
@@ -130,6 +136,22 @@ func main() {

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

ps, err := nats.NewPubSub(cfg.natsURL, "mqtt", logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe nps can be a good compromise? Also mqttPub can become mp and pub below can become np.

In any case, using long or short names - we should be consistent - which is not currently the case.

SetClientID(id).
SetCleanSession(false)
client := mqtt.NewClient(opts)
tkn := client.Connect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does tkn stand for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It stands for token. That's the construct Paho internally returns on the async operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not token?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is bit confusing IMHO, maybe comment could help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drasko why a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not token than?

8000
import (
"errors"
"fmt"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kill this white line

drasko
drasko previously approved these changes May 6, 2020
Copy link
Contributor
@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

func (pub publisher) Publish(topic string, msg messaging.Message) error {
token := pub.client.Publish(topic, qos, false, msg.Payload)
ok := token.WaitTimeout(pub.timeout)
if ok && token.Error() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if token.Error() != nil in the previous line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont you like this if token.Wait() && token.Error() != nil { better ?

Comment on lines 42 to 48
// Use concatenation instead of mft.Sprintf for the
// sake of simplicity and performance.
topic := channels + "." + msg.Channel + "." + messages
if msg.Subtopic != "" {
topic += "." + msg.Subtopic
}
topic = strings.ReplaceAll(topic, ".", "/")
Copy link
Contributor
@manuio manuio May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not simply use /?
Also, use fmt.Sprint() to concatenate strings

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to /. String buffer would be an overkill here, and the comment explains why I don't want to use Sprintf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, and I think we should consider avoiding sprintf in general where + string concatenation can bring simplicity. We need to sync internally and understand when to use one and when another.

token := sub.client.Subscribe(topic, qos, sub.mqttHandler(handler))
ok := token.WaitTimeout(sub.timeout)
if ok && token.Error() != nil {
return token.Error()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we return token.Error() on multiple places it would be good to wrap with another error so that exact place can be more easily detected

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave error wrapping for the layer above MQTT Publisher or Subscriber. We only use it as a wrapper around MQTT client, so returning the real error is sufficient, IMHO. However, I'll improve the message of the errors we return if token is expired to be more specific ("failed to subscribe/unsubscribe due to timeout reached" is better than just "failed to subscribe/unsubscribe").

@dborovcanin dborovcanin force-pushed the mqtt-forwarder branch 2 times, most recently from f75d21a to fe27966 Compare May 6, 2020 17:40
dborovcanin added 10 commits May 6, 2020 19:41
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
10000
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
cmd/mqtt/main.go Outdated
@@ -16,6 +16,7 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
mqttpub "github.com/mainflux/mainflux/messaging/mqtt"
"github.com/mainflux/mainflux/messaging/nats"
mqtt "github.com/mainflux/mainflux/mqtt"
mr "github.com/mainflux/mainflux/mqtt/redis"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use mqttRedis here.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
Copy link
Contributor
@manuio manuio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@manuio manuio merged commit b8be181 into absmach:master May 7, 2020
manuio pushed a commit that referenced this pull request Oct 12, 2020
* Add MQTT forwarder

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Cleanup forwarder code

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use MQTT Publisher in MQTT forwarder

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Cleanup MQTT messaging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add Paho client timeout errors

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Simplify MQTT fowarder

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix naming in main method

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use interface and struct instead of function

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use Mainflux errors package

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename `tkn` to `token`

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use "/" instead of "." as topic separator

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use async MQTT Publisher

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix timeout errors messages

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add connect token check

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename package alias

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
@dborovcanin dborovcanin deleted the mqtt-forwarder branch February 17, 2021 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add MQTT forwarder
6 participants
0