From 6ae35ffc71edff4edcd0270ee17b5fb0b5e197b3 Mon Sep 17 00:00:00 2001 From: Abdelatif Guettouche Date: Sat, 27 Mar 2021 19:25:03 +0100 Subject: [PATCH] examples: Add an MQTT-C example. Signed-off-by: Abdelatif Guettouche --- examples/mqttc/Kconfig | 31 ++++ examples/mqttc/Make.defs | 23 +++ examples/mqttc/Makefile | 12 ++ examples/mqttc/README.md | 25 +++ examples/mqttc/mqttc_pub.c | 321 +++++++++++++++++++++++++++++++++++++ 5 files changed, 412 insertions(+) create mode 100644 examples/mqttc/Kconfig create mode 100644 examples/mqttc/Make.defs create mode 100644 examples/mqttc/Makefile create mode 100644 examples/mqttc/README.md create mode 100644 examples/mqttc/mqttc_pub.c diff --git a/examples/mqttc/Kconfig b/examples/mqttc/Kconfig new file mode 100644 index 000000000..279378da7 --- /dev/null +++ b/examples/mqttc/Kconfig @@ -0,0 +1,31 @@ +# +# For a description of the syntax of this configuration file, +# see the file kconfig-language.txt in the NuttX tools repository. +# + +config EXAMPLES_MQTTC + tristate "Enable MQTT-C Example" + default n + depends on NETUTILS_MQTTC + ---help--- + Enable a simple MQTT-C publisher example + +if EXAMPLES_MQTTC + +config EXAMPLES_MQTTC_PROGNAME + string "Program name" + default "mqttc_pub" + +config EXAMPLES_MQTTC_STACKSIZE + int "Task's stack size" + default 8192 + +config EXAMPLES_MQTTC_TXSIZE + int "TX Buffer size" + default 256 + +config EXAMPLES_MQTTC_RXSIZE + int "RX Buffer size" + default 256 + +endif diff --git a/examples/mqttc/Make.defs b/examples/mqttc/Make.defs new file mode 100644 index 000000000..80ed6a46a --- /dev/null +++ b/examples/mqttc/Make.defs @@ -0,0 +1,23 @@ +############################################################################ +# apps/examples/mqttc/Make.defs +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +ifneq ($(CONFIG_EXAMPLES_MQTTC),) +CONFIGURED_APPS += $(APPDIR)/examples/mqttc +endif diff --git a/examples/mqttc/Makefile b/examples/mqttc/Makefile new file mode 100644 index 000000000..5c844c09c --- /dev/null +++ b/examples/mqttc/Makefile @@ -0,0 +1,12 @@ +include $(APPDIR)/Make.defs + +PROGNAME = $(CONFIG_EXAMPLES_MQTTC_PROGNAME) +PRIORITY = SCHED_PRIORITY_DEFAULT +STACKSIZE = $(CONFIG_EXAMPLES_MQTTC_STACKSIZE) +MODULE = $(CONFIG_EXAMPLES_MQTTC) + +# MQTT-C example source code + +MAINSRC = mqttc_pub.c + +include $(APPDIR)/Application.mk diff --git a/examples/mqttc/README.md b/examples/mqttc/README.md new file mode 100644 index 000000000..1917bca4c --- /dev/null +++ b/examples/mqttc/README.md @@ -0,0 +1,25 @@ +This is a simple MQTT publisher example using MQTT-C + +By default it publishes to the "test" topic and exits. Default behaviour +including, host, port, topic, message and loop count can be changed through +different arguments. + +To test: +From the host start an MQTT broker and subscribe to the "test" topic. Here +mosquitto is used: + +``` +mosquitto& +mosquitto_sub -t test +``` +Make sure that mosquitto is configured in local mode only. + +From the nsh: + +Launch the built-in app `mqttc_pub` specifying the host: + +``` +mqttc_pub -h HOST +``` + +The target will publish the message "test". diff --git a/examples/mqttc/mqttc_pub.c b/examples/mqttc/mqttc_pub.c new file mode 100644 index 000000000..80f6a904b --- /dev/null +++ b/examples/mqttc/mqttc_pub.c @@ -0,0 +1,321 @@ +/**************************************************************************** + * apps/examples/mqttc/mqttc_pub.c + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The + * ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + ****************************************************************************/ + +/**************************************************************************** + * Included Files + ****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +/**************************************************************************** + * Pre-processor Definitions + ****************************************************************************/ + +/**************************************************************************** + * Private Types + ****************************************************************************/ + +struct mqttc_cfg_s +{ + struct mqtt_client client; + const FAR char *host; + const FAR char *port; + const FAR char *topic; + const FAR char *msg; + const FAR char *id; + uint8_t sendbuf[CONFIG_EXAMPLES_MQTTC_TXSIZE]; + uint8_t recvbuf[CONFIG_EXAMPLES_MQTTC_RXSIZE]; + uint32_t tmo; + uint8_t flags; + uint8_t qos; +}; + +/**************************************************************************** + * Private Function Prototypes + ****************************************************************************/ + +static FAR void *client_refresher(FAR void *data); +static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg, + int *n); +static int initserver(const FAR struct mqttc_cfg_s *cfg); + +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Name: client_refresher + * + * Description: + * The client's refresher. This function triggers back-end routines to + * handle ingress/egress traffic to the broker. + * + ****************************************************************************/ + +static FAR void *client_refresher(FAR void *data) +{ + while (1) + { + mqtt_sync((FAR struct mqtt_client *)data); + usleep(100000U); + } + + return NULL; +} + +/**************************************************************************** + * Name: parsearg + * + * Description: + * Parse command line arguments. + * + ****************************************************************************/ + +static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg, int *n) +{ + int opt; + + while ((opt = getopt(argc, argv, "h:p:m:t:n:")) != ERROR) + { + switch (opt) + { + case 'h': + cfg->host = optarg; + break; + + case 'p': + cfg->port = optarg; + break; + + case 'm': + cfg->msg = optarg; + break; + + case 't': + cfg->topic = optarg; + break; + + case 'n': + *n = strtol(optarg, NULL, 10); + break; + + default: + fprintf(stderr, "ERROR: Unrecognized option\n"); + break; + } + } +} + +/**************************************************************************** + * Name: initserver + * + * Description: + * Resolve server's name and try to establish a connection. + * + ****************************************************************************/ + +static int initserver(const FAR struct mqttc_cfg_s *cfg) +{ + struct addrinfo hints; + struct addrinfo *servinfo; + struct addrinfo *itr; + int fd; + int ret; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + ret = getaddrinfo(cfg->host, cfg->port, &hints, &servinfo); + if (ret < 0) + { + printf("ERROR! getaddrinfo() failed:%s\n", gai_strerror(ret)); + } + + itr = servinfo; + do + { + fd = socket(itr->ai_family, itr->ai_socktype, itr->ai_protocol); + if (fd < 0) + { + continue; + } + + ret = connect(fd, itr->ai_addr, itr->ai_addrlen); + if (ret == 0) + { + break; + } + + close(fd); + } + while ((itr = itr->ai_next) != NULL); + + freeaddrinfo(servinfo); + + if (fd < 0) + { + printf("ERROR! Couldn't create socket\n"); + return -1; + } + + ret = fcntl(fd, F_GETFL, 0); + if (ret < 0) + { + printf("ERROR! fcntl() F_GETFL failed, errno:%d\n", errno); + return -1; + } + + ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK); + if (ret < 0) + { + printf("ERROR! fcntl() F_SETFL failed, errno:%d\n", errno); + return -1; + } + + return fd; +} + +/**************************************************************************** + * Public Functions + ****************************************************************************/ + +int main(int argc, char *argv[]) +{ + int sockfd; + enum MQTTErrors mqtterr; + pthread_t thrdid; + int n = 1; + struct mqttc_cfg_s mqtt_cfg = + { + .port = "1883", + .topic = "test", + .msg = "test", + .flags = MQTT_CONNECT_CLEAN_SESSION, + .tmo = 400, + .id = NULL, + .qos = MQTT_PUBLISH_QOS_0, + }; + + parsearg(argc, argv, &mqtt_cfg, &n); + sockfd = initserver(&mqtt_cfg); + if (sockfd < 0) + { + return -1; + } + + mqtterr = mqtt_init(&mqtt_cfg.client, sockfd, + mqtt_cfg.sendbuf, sizeof(mqtt_cfg.sendbuf), + mqtt_cfg.recvbuf, sizeof(mqtt_cfg.recvbuf), + NULL); + if (mqtterr != MQTT_OK) + { + printf("ERRPR! mqtt_init() failed.\n"); + goto err_with_socket; + } + + mqtterr = mqtt_connect(&mqtt_cfg.client, mqtt_cfg.id, + NULL, /* Will topic */ + NULL, /* Will message */ + 0, /* Will message size */ + NULL, /* User name */ + NULL, /* Password */ + mqtt_cfg.flags, mqtt_cfg.tmo); + + if (mqtterr != MQTT_OK) + { + printf("ERROR! mqtt_connect() failed\n"); + goto err_with_socket; + } + + if (mqtt_cfg.client.error != MQTT_OK) + { + printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error)); + goto err_with_socket; + } + else + { + printf("Success: Connected to broker!\n"); + } + + /* Start a thread to refresh the client (handle egress and ingree client + * traffic) + */ + + if (pthread_create(&thrdid, NULL, client_refresher, &mqtt_cfg.client)) + { + printf("ERROR! pthread_create() failed.\n"); + goto err_with_socket; + } + + while (n--) + { + mqtterr = mqtt_publish(&mqtt_cfg.client, mqtt_cfg.topic, + mqtt_cfg.msg, strlen(mqtt_cfg.msg) + 1, + mqtt_cfg.qos); + if (mqtterr != MQTT_OK) + { + printf("ERROR! mqtt_publish() failed\n"); + goto err_with_thrd; + } + + if (mqtt_cfg.client.error != MQTT_OK) + { + printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error)); + goto err_with_thrd; + } + else + { + printf("Success: Published to broker!\n"); + } + + sleep(5); + } + + printf("\nDisconnecting from %s\n\n", mqtt_cfg.host); + mqtterr = mqtt_disconnect(&mqtt_cfg.client); + if (mqtterr != MQTT_OK) + { + printf("ERROR! mqtt_disconnect() failed\n"); + } + + /* Force sending the DISCONNECT, the thread will be canceled before getting + * the chance to sync this last packet. + * Note however that close() would cleanly close the connection but only + * through TCP (i.e. no MQTT DISCONNECT packet). + */ + + mqtt_sync(&mqtt_cfg.client); + +err_with_thrd: + pthread_cancel(thrdid); +err_with_socket: + close(sockfd); + + return 0; +} +