/****************************************************************************
 * apps/examples/mqttc/mqttc_pub.c
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The
 * ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 *
 ****************************************************************************/

/****************************************************************************
 * Included Files
 ****************************************************************************/

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netdb.h>
#include <errno.h>
#include <mqtt.h>

/****************************************************************************
 * Pre-processor Definitions
 ****************************************************************************/

/****************************************************************************
 * Private Types
 ****************************************************************************/

struct mqttc_cfg_s
{
  struct mqtt_client client;
  const FAR char *host;
  const FAR char *port;
  const FAR char *topic;
  const FAR char *msg;
  const FAR char *id;
  uint8_t sendbuf[CONFIG_EXAMPLES_MQTTC_TXSIZE];
  uint8_t recvbuf[CONFIG_EXAMPLES_MQTTC_RXSIZE];
  uint32_t tmo;
  uint8_t flags;
  uint8_t qos;
};

/****************************************************************************
 * Private Function Prototypes
 ****************************************************************************/

static FAR void *client_refresher(FAR void *data);
static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg,
                     int *n);
static int initserver(const FAR struct mqttc_cfg_s *cfg);

/****************************************************************************
 * Private Functions
 ****************************************************************************/

/****************************************************************************
 * Name: client_refresher
 *
 * Description:
 *   The client's refresher. This function triggers back-end routines to
 *   handle ingress/egress traffic to the broker.
 *
 ****************************************************************************/

static FAR void *client_refresher(FAR void *data)
{
  while (1)
    {
      mqtt_sync((FAR struct mqtt_client *)data);
      usleep(100000U);
    }

  return NULL;
}

/****************************************************************************
 * Name: parsearg
 *
 * Description:
 *   Parse command line arguments.
 *
 ****************************************************************************/

static void parsearg(int argc, char *argv[], struct mqttc_cfg_s *cfg, int *n)
{
  int opt;

  while ((opt = getopt(argc, argv, "h:p:m:t:n:")) != ERROR)
    {
      switch (opt)
        {
          case 'h':
            cfg->host = optarg;
            break;

          case 'p':
            cfg->port = optarg;
            break;

          case 'm':
            cfg->msg = optarg;
            break;

          case 't':
            cfg->topic = optarg;
            break;

          case 'n':
            *n = strtol(optarg, NULL, 10);
            break;

          default:
            fprintf(stderr, "ERROR: Unrecognized option\n");
            break;
        }
    }
}

/****************************************************************************
 * Name: initserver
 *
 * Description:
 *   Resolve server's name and try to establish a connection.
 *
 ****************************************************************************/

static int initserver(const FAR struct mqttc_cfg_s *cfg)
{
  struct addrinfo hints;
  struct addrinfo *servinfo;
  struct addrinfo *itr;
  int fd;
  int ret;

  memset(&hints, 0, sizeof(hints));
  hints.ai_family  = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  printf("Connecting to %s:%s...\n", cfg->host, cfg->port);

  ret = getaddrinfo(cfg->host, cfg->port, &hints, &servinfo);
  if (ret != OK)
    {
      printf("ERROR! getaddrinfo() failed: %s\n", gai_strerror(ret));
      return -1;
    }

  itr = servinfo;
  do
    {
      fd = socket(itr->ai_family, itr->ai_socktype, itr->ai_protocol);
      if (fd < 0)
        {
          continue;
        }

      ret = connect(fd, itr->ai_addr, itr->ai_addrlen);
      if (ret == 0)
        {
          break;
        }

      close(fd);
      fd = -1;
    }
  while ((itr = itr->ai_next) != NULL);

  freeaddrinfo(servinfo);

  if (fd < 0)
    {
      printf("ERROR! Couldn't create socket\n");
      return -1;
    }

  ret = fcntl(fd, F_GETFL, 0);
  if (ret < 0)
    {
      printf("ERROR! fcntl() F_GETFL failed, errno: %d\n", errno);
      return -1;
    }

  ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
  if (ret < 0)
    {
      printf("ERROR! fcntl() F_SETFL failed, errno: %d\n", errno);
      return -1;
    }

  return fd;
}

/****************************************************************************
 * Public Functions
 ****************************************************************************/

int main(int argc, char *argv[])
{
  int sockfd;
  enum MQTTErrors mqtterr;
  pthread_t thrdid;
  int n = 1;
  struct mqttc_cfg_s mqtt_cfg =
    {
      .host = "broker.hivemq.com",
      .port = "1883",
      .topic = "test",
      .msg = "test",
      .flags = MQTT_CONNECT_CLEAN_SESSION,
      .tmo = 400,
      .id = NULL,
      .qos = MQTT_PUBLISH_QOS_0,
    };

  parsearg(argc, argv, &mqtt_cfg, &n);
  sockfd = initserver(&mqtt_cfg);
  if (sockfd < 0)
    {
      return -1;
    }

  mqtterr = mqtt_init(&mqtt_cfg.client, sockfd,
                      mqtt_cfg.sendbuf, sizeof(mqtt_cfg.sendbuf),
                      mqtt_cfg.recvbuf, sizeof(mqtt_cfg.recvbuf),
                      NULL);
  if (mqtterr != MQTT_OK)
    {
      printf("ERRPR! mqtt_init() failed.\n");
      goto err_with_socket;
    }

  mqtterr = mqtt_connect(&mqtt_cfg.client, mqtt_cfg.id,
                         NULL, /* Will topic */
                         NULL, /* Will message */
                         0,    /* Will message size */
                         NULL, /* User name */
                         NULL, /* Password */
                         mqtt_cfg.flags, mqtt_cfg.tmo);

  if (mqtterr != MQTT_OK)
    {
      printf("ERROR! mqtt_connect() failed\n");
      goto err_with_socket;
    }

  if (mqtt_cfg.client.error != MQTT_OK)
    {
      printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error));
      goto err_with_socket;
    }
  else
    {
      printf("Success: Connected to broker!\n");
    }

  /* Start a thread to refresh the client (handle egress and ingree client
   * traffic)
   */

  if (pthread_create(&thrdid, NULL, client_refresher, &mqtt_cfg.client))
    {
      printf("ERROR! pthread_create() failed.\n");
      goto err_with_socket;
    }

  while (n--)
    {
      mqtterr = mqtt_publish(&mqtt_cfg.client, mqtt_cfg.topic,
                             mqtt_cfg.msg, strlen(mqtt_cfg.msg) + 1,
                             mqtt_cfg.qos);
      if (mqtterr != MQTT_OK)
        {
          printf("ERROR! mqtt_publish() failed\n");
          goto err_with_thrd;
        }

      if (mqtt_cfg.client.error != MQTT_OK)
        {
          printf("error: %s\n", mqtt_error_str(mqtt_cfg.client.error));
          goto err_with_thrd;
        }
      else
        {
          printf("Success: Published to broker!\n");
        }

      sleep(5);
    }

  printf("\nDisconnecting from %s\n\n", mqtt_cfg.host);
  mqtterr = mqtt_disconnect(&mqtt_cfg.client);
  if (mqtterr != MQTT_OK)
    {
      printf("ERROR! mqtt_disconnect() failed\n");
    }

  /* Force sending the DISCONNECT, the thread will be canceled before getting
   * the chance to sync this last packet.
   * Note however that close() would cleanly close the connection but only
   * through TCP (i.e. no MQTT DISCONNECT packet).
   */

  mqtt_sync(&mqtt_cfg.client);

err_with_thrd:
  pthread_cancel(thrdid);
err_with_socket:
  close(sockfd);

  return 0;
}