hems/mqifproxy/mqifproxy.go
2024-05-18 14:14:49 +02:00

124 lines
2.9 KiB
Go

package main
import (
"bytes"
"flag"
"fmt"
"log"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// MQTT_SERVER: format: tcp://192.168.178.8:1883
// INFLUX_SERVER: http://192.168.178.8:8086
var haveIFdb bool = false
var broker = flag.String("mqtt", os.Getenv("MQTT_SERVER"), "the broker protocol://ip:port; protocol can be 'ws' or 'tcp'.")
var ifhost = flag.String("influx", os.Getenv("INFLUX_SERVER"), "the http write url of the influx db; includes port if not 80.")
var verbose = flag.Bool("v", false, "log events.")
var ifch = make(chan string, 128)
func checkForInflux() {
respi, err := http.Get(*ifhost + "/ping")
if err != nil || respi.StatusCode != 204 {
fmt.Fprintf(os.Stderr, "InfluxDB error or not present.\n")
} else {
haveIFdb = true
go func() {
for {
updateInflux(<-ifch)
}
}()
}
}
func updateInflux(ifData string) {
if ifData != "" {
if *verbose {
fmt.Fprintf(os.Stderr, "ifdata: %s\n", ifData)
}
if haveIFdb {
respi, err := http.Post(*ifhost+"/write?db=pm", "text/plain", bytes.NewBufferString(ifData))
if err != nil {
fmt.Fprintf(os.Stderr, "Error post to influx: %v\n", err)
haveIFdb = false
} else {
respi.Body.Close()
}
} else {
if *verbose {
fmt.Fprintln(os.Stderr, "no post")
}
checkForInflux()
}
}
}
func connectSubscribe(client mqtt.Client) {
qos := 0
for {
if token := client.Connect(); token.Wait() && token.Error() == nil {
break
}
time.Sleep(5 * time.Second)
}
// ----------------------------------------
// subject smr5 will depend on smr2mqtt choice, of cource
var sub0 = "/smr5/#"
if token := client.Subscribe(sub0, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
parts := strings.Split(msg.Topic(),"/")
ifch <-fmt.Sprintf("smr5 %s=%s",parts[3],string(msg.Payload()))
}); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
if *verbose {
log.Println("subscribed " + sub0)
}
}
func main() {
var client mqtt.Client
flag.Parse()
rand.Seed(time.Now().UTC().UnixNano())
opts := mqtt.NewClientOptions()
opts.AddBroker(*broker)
rand.Seed(time.Now().UTC().UnixNano())
opts.SetClientID(fmt.Sprintf("%d",20000 + rand.Intn(10000)))
opts.SetCleanSession(true)
checkForInflux()
opts = opts.SetOnConnectHandler(func(client mqtt.Client) {
log.Printf("connected to: " + *broker + ".\n")
})
opts = opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
log.Printf("connection lost: %v.\n", err)
connectSubscribe(client)
})
// wait a while; seems mosquitto needs time to get ready
time.Sleep(3 * time.Second)
client = mqtt.NewClient(opts)
connectSubscribe(client)
sig := make(chan os.Signal,8)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
s := <-sig
fmt.Printf("\x1b[2KSignal: %v\n", s)
client.Disconnect(250)
}