package main import ( "bytes" "flag" "fmt" "log" "math/rand" "net/http" _ "net/http/pprof" "os" "os/signal" "strings" "syscall" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) // MQTT_SERVER: format: tcp://192.168.178.8:1883 // INFLUX_SERVER: http://192.168.178.8:8086 var haveIFdb bool = false var broker = flag.String("mqtt", os.Getenv("MQTT_SERVER"), "the broker protocol://ip:port; protocol can be 'ws' or 'tcp'.") var ifhost = flag.String("influx", os.Getenv("INFLUX_SERVER"), "the http write url of the influx db; includes port if not 80.") var verbose = flag.Bool("v", false, "log events.") var ifch = make(chan string, 128) func checkForInflux() { respi, err := http.Get(*ifhost + "/ping") if err != nil || respi.StatusCode != 204 { fmt.Fprintf(os.Stderr, "InfluxDB error or not present.\n") } else { haveIFdb = true go func() { for { updateInflux(<-ifch) } }() } } func updateInflux(ifData string) { if ifData != "" { if *verbose { fmt.Fprintf(os.Stderr, "ifdata: %s\n", ifData) } if haveIFdb { respi, err := http.Post(*ifhost+"/write?db=pm", "text/plain", bytes.NewBufferString(ifData)) if err != nil { fmt.Fprintf(os.Stderr, "Error post to influx: %v\n", err) haveIFdb = false } else { respi.Body.Close() } } else { if *verbose { fmt.Fprintln(os.Stderr, "no post") } checkForInflux() } } } func connectSubscribe(client mqtt.Client) { qos := 0 for { if token := client.Connect(); token.Wait() && token.Error() == nil { break } time.Sleep(5 * time.Second) } // ---------------------------------------- // subject smr5 will depend on smr2mqtt choice, of cource var sub0 = "/smr5/#" if token := client.Subscribe(sub0, byte(qos), func(client mqtt.Client, msg mqtt.Message) { parts := strings.Split(msg.Topic(),"/") ifch <-fmt.Sprintf("smr5 %s=%s",parts[3],string(msg.Payload())) }); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } if *verbose { log.Println("subscribed " + sub0) } } func main() { var client mqtt.Client flag.Parse() rand.Seed(time.Now().UTC().UnixNano()) opts := mqtt.NewClientOptions() opts.AddBroker(*broker) rand.Seed(time.Now().UTC().UnixNano()) opts.SetClientID(fmt.Sprintf("%d",20000 + rand.Intn(10000))) opts.SetCleanSession(true) checkForInflux() opts = opts.SetOnConnectHandler(func(client mqtt.Client) { log.Printf("connected to: " + *broker + ".\n") }) opts = opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { log.Printf("connection lost: %v.\n", err) connectSubscribe(client) }) // wait a while; seems mosquitto needs time to get ready time.Sleep(3 * time.Second) client = mqtt.NewClient(opts) connectSubscribe(client) sig := make(chan os.Signal,8) signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) s := <-sig fmt.Printf("\x1b[2KSignal: %v\n", s) client.Disconnect(250) }