examples: Add an MQTT-C example.

Signed-off-by: Abdelatif Guettouche <abdelatif.guettouche@espressif.com>
This commit is contained in:
Abdelatif Guettouche 2021-03-27 19:25:03 +01:00 committed by Xiang Xiao
parent e35ea9486e
commit 6ae35ffc71
5 changed files with 412 additions and 0 deletions

31
examples/mqttc/Kconfig Normal file
View File

@ -0,0 +1,31 @@
#
# For a description of the syntax of this configuration file,
# see the file kconfig-language.txt in the NuttX tools repository.
#
config EXAMPLES_MQTTC
tristate "Enable MQTT-C Example"
default n
depends on NETUTILS_MQTTC
---help---
Enable a simple MQTT-C publisher example
if EXAMPLES_MQTTC
config EXAMPLES_MQTTC_PROGNAME
string "Program name"
default "mqttc_pub"
config EXAMPLES_MQTTC_STACKSIZE
int "Task's stack size"
default 8192
config EXAMPLES_MQTTC_TXSIZE
int "TX Buffer size"
default 256
config EXAMPLES_MQTTC_RXSIZE
int "RX Buffer size"
default 256
endif

23
examples/mqttc/Make.defs Normal file
View File

@ -0,0 +1,23 @@
############################################################################
# apps/examples/mqttc/Make.defs
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
ifneq ($(CONFIG_EXAMPLES_MQTTC),)
CONFIGURED_APPS += $(APPDIR)/examples/mqttc
endif

12
examples/mqttc/Makefile Normal file
View File

@ -0,0 +1,12 @@
include $(APPDIR)/Make.defs
PROGNAME = $(CONFIG_EXAMPLES_MQTTC_PROGNAME)
PRIORITY = SCHED_PRIORITY_DEFAULT
STACKSIZE = $(CONFIG_EXAMPLES_MQTTC_STACKSIZE)
MODULE = $(CONFIG_EXAMPLES_MQTTC)
# MQTT-C example source code
MAINSRC = mqttc_pub.c
include $(APPDIR)/Application.mk

25
examples/mqttc/README.md Normal file
View File

@ -0,0 +1,25 @@
This is a simple MQTT publisher example using MQTT-C
By default it publishes to the "test" topic and exits. Default behaviour
including, host, port, topic, message and loop count can be changed through
different arguments.
To test:
From the host start an MQTT broker and subscribe to the "test" topic. Here
mosquitto is used:
```
mosquitto&
mosquitto_sub -t test
```
Make sure that mosquitto is configured in local mode only.
From the nsh:
Launch the built-in app `mqttc_pub` specifying the host:
```
mqttc_pub -h HOST
```
The target will publish the message "test".

321
examples/mqttc/mqttc_pub.c Normal file
View File

@ -0,0 +1,321 @@
/****************************************************************************
* apps/examples/mqttc/mqttc_pub.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netdb.h>
#include <errno.h>
#include <mqtt.h>
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
/****************************************************************************
* Private Types
****************************************************************************/
struct mqttc_cfg_s
{
struct mqtt_client client;
const FAR char *host;
const FAR char *port;
const FAR char *topic;
const FAR char *msg;
const FAR char *id;
uint8_t sendbuf[CONFIG_EXAMPLES_MQTTC_TXSIZE];
uint8_t recvbuf[CONFIG_EXAMPLES_MQTTC_RXSIZE];
uint32_t tmo;
uint8_t flags;
uint8_t qos;
};
/****************************************************************************
* Private Function Prototypes
****************************************************************************/
static FAR void *client_refresher(FAR void *data);
static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg,
int *n);
static int initserver(const FAR struct mqttc_cfg_s *cfg);
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: client_refresher
*
* Description:
* The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
****************************************************************************/
static FAR void *client_refresher(FAR void *data)
{
while (1)
{
mqtt_sync((FAR struct mqtt_client *)data);
usleep(100000U);
}
return NULL;
}
/****************************************************************************
* Name: parsearg
*
* Description:
* Parse command line arguments.
*
****************************************************************************/
static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg, int *n)
{
int opt;
while ((opt = getopt(argc, argv, "h:p:m:t:n:")) != ERROR)
{
switch (opt)
{
case 'h':
cfg->host = optarg;
break;
case 'p':
cfg->port = optarg;
break;
case 'm':
cfg->msg = optarg;
break;
case 't':
cfg->topic = optarg;
break;
case 'n':
*n = strtol(optarg, NULL, 10);
break;
default:
fprintf(stderr, "ERROR: Unrecognized option\n");
break;
}
}
}
/****************************************************************************
* Name: initserver
*
* Description:
* Resolve server's name and try to establish a connection.
*
****************************************************************************/
static int initserver(const FAR struct mqttc_cfg_s *cfg)
{
struct addrinfo hints;
struct addrinfo *servinfo;
struct addrinfo *itr;
int fd;
int ret;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
ret = getaddrinfo(cfg->host, cfg->port, &hints, &servinfo);
if (ret < 0)
{
printf("ERROR! getaddrinfo() failed:%s\n", gai_strerror(ret));
}
itr = servinfo;
do
{
fd = socket(itr->ai_family, itr->ai_socktype, itr->ai_protocol);
if (fd < 0)
{
continue;
}
ret = connect(fd, itr->ai_addr, itr->ai_addrlen);
if (ret == 0)
{
break;
}
close(fd);
}
while ((itr = itr->ai_next) != NULL);
freeaddrinfo(servinfo);
if (fd < 0)
{
printf("ERROR! Couldn't create socket\n");
return -1;
}
ret = fcntl(fd, F_GETFL, 0);
if (ret < 0)
{
printf("ERROR! fcntl() F_GETFL failed, errno:%d\n", errno);
return -1;
}
ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
if (ret < 0)
{
printf("ERROR! fcntl() F_SETFL failed, errno:%d\n", errno);
return -1;
}
return fd;
}
/****************************************************************************
* Public Functions
****************************************************************************/
int main(int argc, char *argv[])
{
int sockfd;
enum MQTTErrors mqtterr;
pthread_t thrdid;
int n = 1;
struct mqttc_cfg_s mqtt_cfg =
{
.port = "1883",
.topic = "test",
.msg = "test",
.flags = MQTT_CONNECT_CLEAN_SESSION,
.tmo = 400,
.id = NULL,
.qos = MQTT_PUBLISH_QOS_0,
};
parsearg(argc, argv, &mqtt_cfg, &n);
sockfd = initserver(&mqtt_cfg);
if (sockfd < 0)
{
return -1;
}
mqtterr = mqtt_init(&mqtt_cfg.client, sockfd,
mqtt_cfg.sendbuf, sizeof(mqtt_cfg.sendbuf),
mqtt_cfg.recvbuf, sizeof(mqtt_cfg.recvbuf),
NULL);
if (mqtterr != MQTT_OK)
{
printf("ERRPR! mqtt_init() failed.\n");
goto err_with_socket;
}
mqtterr = mqtt_connect(&mqtt_cfg.client, mqtt_cfg.id,
NULL, /* Will topic */
NULL, /* Will message */
0, /* Will message size */
NULL, /* User name */
NULL, /* Password */
mqtt_cfg.flags, mqtt_cfg.tmo);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_connect() failed\n");
goto err_with_socket;
}
if (mqtt_cfg.client.error != MQTT_OK)
{
printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error));
goto err_with_socket;
}
else
{
printf("Success: Connected to broker!\n");
}
/* Start a thread to refresh the client (handle egress and ingree client
* traffic)
*/
if (pthread_create(&thrdid, NULL, client_refresher, &mqtt_cfg.client))
{
printf("ERROR! pthread_create() failed.\n");
goto err_with_socket;
}
while (n--)
{
mqtterr = mqtt_publish(&mqtt_cfg.client, mqtt_cfg.topic,
mqtt_cfg.msg, strlen(mqtt_cfg.msg) + 1,
mqtt_cfg.qos);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_publish() failed\n");
goto err_with_thrd;
}
if (mqtt_cfg.client.error != MQTT_OK)
{
printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error));
goto err_with_thrd;
}
else
{
printf("Success: Published to broker!\n");
}
sleep(5);
}
printf("\nDisconnecting from %s\n\n", mqtt_cfg.host);
mqtterr = mqtt_disconnect(&mqtt_cfg.client);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_disconnect() failed\n");
}
/* Force sending the DISCONNECT, the thread will be canceled before getting
* the chance to sync this last packet.
* Note however that close() would cleanly close the connection but only
* through TCP (i.e. no MQTT DISCONNECT packet).
*/
mqtt_sync(&mqtt_cfg.client);
err_with_thrd:
pthread_cancel(thrdid);
err_with_socket:
close(sockfd);
return 0;
}