124 lines
2.9 KiB
Go
124 lines
2.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
// MQTT_SERVER: format: tcp://192.168.178.8:1883
|
|
// INFLUX_SERVER: http://192.168.178.8:8086
|
|
var haveIFdb bool = false
|
|
var broker = flag.String("mqtt", os.Getenv("MQTT_SERVER"), "the broker protocol://ip:port; protocol can be 'ws' or 'tcp'.")
|
|
var ifhost = flag.String("influx", os.Getenv("INFLUX_SERVER"), "the http write url of the influx db; includes port if not 80.")
|
|
var verbose = flag.Bool("v", false, "log events.")
|
|
|
|
var ifch = make(chan string, 128)
|
|
|
|
func checkForInflux() {
|
|
respi, err := http.Get(*ifhost + "/ping")
|
|
if err != nil || respi.StatusCode != 204 {
|
|
fmt.Fprintf(os.Stderr, "InfluxDB error or not present.\n")
|
|
} else {
|
|
haveIFdb = true
|
|
go func() {
|
|
for {
|
|
updateInflux(<-ifch)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func updateInflux(ifData string) {
|
|
if ifData != "" {
|
|
if *verbose {
|
|
fmt.Fprintf(os.Stderr, "ifdata: %s\n", ifData)
|
|
}
|
|
if haveIFdb {
|
|
respi, err := http.Post(*ifhost+"/write?db=pm", "text/plain", bytes.NewBufferString(ifData))
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error post to influx: %v\n", err)
|
|
haveIFdb = false
|
|
} else {
|
|
respi.Body.Close()
|
|
}
|
|
} else {
|
|
if *verbose {
|
|
fmt.Fprintln(os.Stderr, "no post")
|
|
}
|
|
checkForInflux()
|
|
}
|
|
}
|
|
}
|
|
|
|
func connectSubscribe(client mqtt.Client) {
|
|
qos := 0
|
|
|
|
for {
|
|
if token := client.Connect(); token.Wait() && token.Error() == nil {
|
|
break
|
|
}
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
|
|
// ----------------------------------------
|
|
// subject smr5 will depend on smr2mqtt choice, of cource
|
|
var sub0 = "/smr5/#"
|
|
if token := client.Subscribe(sub0, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
|
|
parts := strings.Split(msg.Topic(),"/")
|
|
ifch <-fmt.Sprintf("smr5 %s=%s",parts[3],string(msg.Payload()))
|
|
}); token.Wait() && token.Error() != nil {
|
|
fmt.Println(token.Error())
|
|
os.Exit(1)
|
|
}
|
|
if *verbose {
|
|
log.Println("subscribed " + sub0)
|
|
}
|
|
|
|
|
|
}
|
|
|
|
func main() {
|
|
var client mqtt.Client
|
|
|
|
flag.Parse()
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(*broker)
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
opts.SetClientID(fmt.Sprintf("%d",20000 + rand.Intn(10000)))
|
|
opts.SetCleanSession(true)
|
|
|
|
checkForInflux()
|
|
|
|
opts = opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
log.Printf("connected to: " + *broker + ".\n")
|
|
})
|
|
|
|
opts = opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
|
log.Printf("connection lost: %v.\n", err)
|
|
connectSubscribe(client)
|
|
})
|
|
|
|
// wait a while; seems mosquitto needs time to get ready
|
|
time.Sleep(3 * time.Second)
|
|
client = mqtt.NewClient(opts)
|
|
connectSubscribe(client)
|
|
|
|
sig := make(chan os.Signal,8)
|
|
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
|
|
s := <-sig
|
|
fmt.Printf("\x1b[2KSignal: %v\n", s)
|
|
client.Disconnect(250)
|
|
}
|