Commit bccc6e9d authored by Sjoerd Crooijmans's avatar Sjoerd Crooijmans
Browse files

Merge branch 'improved-code-style' into 'master'

Improved code style

See merge request clean-and-itasks/contrib/mqttclient!7
parents f6f1f763 97633821
Pipeline #46358 passed with stages
in 2 minutes and 41 seconds
......@@ -10,5 +10,12 @@ Clean System Files
*.prj
*.prp
# Object files
*.o
*.obj
# System specific files
src/_MQTT.*
# Other
.clang-format
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <errno.h>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/socket.h>
#endif
#include "wolfmqtt/mqtt_client.h"
#include "Clean.h"
#include "wolfmqtt/mqtt_client.h"
#define BUFFER_SIZE 2048
typedef struct Msg {
bool done;
int qos;
bool retain;
int topic_len;
byte *topic_buf;
int payload_len;
byte *payload_buf;
struct Msg* next;
bool done;
int qos;
bool retain;
int topic_len;
byte *topic_buf;
int payload_len;
byte *payload_buf;
struct Msg *next;
} Msg;
typedef struct MqttCtx {
Msg* queue_head;
Msg* queue_tail;
uint16_t packetId;
int fd;
Msg *queue_head;
Msg *queue_tail;
uint16_t packetId;
int fd;
} MqttCtx;
int rc = 0;
......@@ -37,420 +37,408 @@ int rc = 0;
// Mqttwrapper specific exit codes
#define MQTT_CODE_INCORRECT_PROTOCOL -700
#define MQTT_CODE_CONNECTION_REFUSED -701
#define MQTT_CODE_AUTH_FAILED -702
#define MQTT_CODE_QUEUE_EMPTY -800
#define MQTT_CODE_AUTH_FAILED -702
#define MQTT_CODE_QUEUE_EMPTY -800
static int mqtt_message_cb(MqttClient *client, MqttMessage *mqqt_msg, byte msg_new, byte msg_done)
static int mqtt_message_cb(MqttClient *client, MqttMessage *mqqt_msg,
byte msg_new, byte msg_done)
{
MqttCtx* ctx = (MqttCtx*) client->ctx;
if (msg_new) {
Msg* msg = malloc(sizeof(Msg));
if (msg == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
/* Topic */
byte* topic_buf = malloc(mqqt_msg->topic_name_len);
if (topic_buf == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
memcpy(topic_buf, mqqt_msg->topic_name, mqqt_msg->topic_name_len);
msg->topic_len = mqqt_msg->topic_name_len;
msg->topic_buf = topic_buf;
/* Payload */
byte* payload_buf = malloc(mqqt_msg->total_len);
if (payload_buf == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
memcpy(payload_buf, mqqt_msg->buffer, mqqt_msg->buffer_len);
msg->payload_len = mqqt_msg->total_len;
msg->payload_buf = payload_buf;
/* Opts */
msg->qos = mqqt_msg->qos;
msg->retain = mqqt_msg->retain;
msg->next = NULL;
if (ctx->queue_head != NULL) {
ctx->queue_head->next = msg;
}
ctx->queue_head = msg;
if(ctx->queue_tail == NULL) {
ctx->queue_tail = msg;
}
}else {
memcpy(ctx->queue_head->payload_buf + mqqt_msg->buffer_pos, mqqt_msg->buffer, mqqt_msg->buffer_len);
}
if (msg_done) {
ctx->queue_head->done = true;
}
return MQTT_CODE_SUCCESS;
MqttCtx *ctx = (MqttCtx *)client->ctx;
if (msg_new) {
Msg *msg = malloc(sizeof(Msg));
if (msg == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
/* Topic */
byte *topic_buf = malloc(mqqt_msg->topic_name_len);
if (topic_buf == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
memcpy(topic_buf, mqqt_msg->topic_name,
mqqt_msg->topic_name_len);
msg->topic_len = mqqt_msg->topic_name_len;
msg->topic_buf = topic_buf;
/* Payload */
byte *payload_buf = malloc(mqqt_msg->total_len);
if (payload_buf == NULL) {
return MQTT_CODE_ERROR_MEMORY;
}
memcpy(payload_buf, mqqt_msg->buffer, mqqt_msg->buffer_len);
msg->payload_len = mqqt_msg->total_len;
msg->payload_buf = payload_buf;
/* Opts */
msg->qos = mqqt_msg->qos;
msg->retain = mqqt_msg->retain;
msg->next = NULL;
if (ctx->queue_head != NULL) {
ctx->queue_head->next = msg;
}
ctx->queue_head = msg;
if (ctx->queue_tail == NULL) {
ctx->queue_tail = msg;
}
} else {
memcpy(ctx->queue_head->payload_buf + mqqt_msg->buffer_pos,
mqqt_msg->buffer, mqqt_msg->buffer_len);
}
if (msg_done) {
ctx->queue_head->done = true;
}
return MQTT_CODE_SUCCESS;
}
static int read_cb(void *context, byte* buf, int buf_len, int timeout_ms)
static int read_cb(void *context, byte *buf, int buf_len, int timeout_ms)
{
int rc;
MqttCtx *ctx = (MqttCtx*) context;
fd_set set;
int b = 0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
FD_ZERO (&set);
FD_SET (ctx->fd, &set);
while (b < buf_len) {
/* Wait till there is data to read */
if (select(FD_SETSIZE, &set, NULL, NULL, &tv) == 0) {
if (b == 0) {
return MQTT_CODE_ERROR_TIMEOUT;
}
return b;
}
rc = (int)recv(ctx->fd, (char *)&buf[b], buf_len - b, 0);
int rc;
MqttCtx *ctx = (MqttCtx *)context;
fd_set set;
int b = 0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_ms * 1000;
FD_ZERO(&set);
FD_SET(ctx->fd, &set);
while (b < buf_len) {
/* Wait till there is data to read */
if (select(FD_SETSIZE, &set, NULL, NULL, &tv) == 0) {
if (b == 0) {
return MQTT_CODE_ERROR_TIMEOUT;
}
return b;
}
rc = (int)recv(ctx->fd, (char *)&buf[b], buf_len - b, 0);
#ifdef _WIN32
if (rc == SOCKET_ERROR) {
if (WSAGetLastError() == WSAEWOULDBLOCK) {
continue;
}
if (rc == SOCKET_ERROR) {
if (WSAGetLastError() == WSAEWOULDBLOCK) {
continue;
}
#else
if (rc < 0) { // Error occured
/* continue if it is non-blocking error */
if (errno == EAGAIN || errno == EWOULDBLOCK){
continue;
}
if (rc < 0) { // Error occured
/* continue if it is non-blocking error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
#endif
return MQTT_CODE_ERROR_NETWORK;
}
b += rc;
}
return MQTT_CODE_ERROR_NETWORK;
}
b += rc;
}
return b;
return b;
}
static int write_cb(void *context, const byte* buf, int buf_len, int timeout_ms)
static int write_cb(void *context, const byte *buf, int buf_len, int timeout_ms)
{
MqttCtx *ctx = (MqttCtx*) context;
int totalSend = 0;
while (totalSend < buf_len) {
rc = (int)send(ctx->fd, (char *)buf, buf_len, 0);
totalSend += rc;
if (rc < 0) {
return MQTT_CODE_ERROR_NETWORK;
}
}
return rc;
MqttCtx *ctx = (MqttCtx *)context;
int totalSend = 0;
while (totalSend < buf_len) {
rc = (int)send(ctx->fd, (char *)buf, buf_len, 0);
totalSend += rc;
if (rc < 0) {
return MQTT_CODE_ERROR_NETWORK;
}
}
return rc;
}
static int connect_cb(void *context, const char* host, word16 port, int timeout_ms)
static int connect_cb(void *context, const char *host, word16 port,
int timeout_ms)
{
/* Connection is made in Clean */
return MQTT_CODE_SUCCESS;
/* Connection is made in Clean */
return MQTT_CODE_SUCCESS;
}
static int disconnect_cb(void *context)
{
/* Disconnect is done in Clean */
return MQTT_CODE_SUCCESS;
/* Disconnect is done in Clean */
return MQTT_CODE_SUCCESS;
}
MqttClient* mqtt_connect(
int fd,
CleanCharArray client_id,
int keep_alive,
int clean_session,
CleanCharArray username,
CleanCharArray password,
CleanCharArray lw_topic,
CleanCharArray lw_msg,
int qos,
int retain,
int *ret
)
{
MqttCtx *ctx = calloc(1, sizeof(MqttCtx));
if (ctx == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
ctx->fd = fd;
MqttNet *net = calloc(1, sizeof(MqttNet));
if (net == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
net->connect = connect_cb;
net->read = read_cb;
net->write = write_cb;
net->disconnect = disconnect_cb;
net->context = ctx;
byte *txBuf = NULL;
byte *rxBuf = NULL;
txBuf = (byte*) malloc(BUFFER_SIZE);
rxBuf = (byte*) malloc(BUFFER_SIZE);
if (txBuf == NULL || rxBuf == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
MqttClient *mqttClient;
mqttClient = malloc(sizeof(MqttClient));
if (mqttClient == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
*ret = MqttClient_Init(mqttClient, net, mqtt_message_cb, txBuf, BUFFER_SIZE,
rxBuf, BUFFER_SIZE, 1000);
if (*ret != MQTT_CODE_SUCCESS) {
return NULL;
}
mqttClient->ctx = ctx;
// Connect
MqttConnect connect;
memset(&connect, 0, sizeof(connect));
connect.keep_alive_sec = keep_alive;
connect.client_id = (char*) client_id;
connect.clean_session = clean_session;
connect.username = (char*) username;
connect.password = (char*) password;
if (strlen((char*) lw_topic) > 0) {
MqttMessage lwt_msg;
memset(&lwt_msg, 0, sizeof(lwt_msg));
connect.lwt_msg = &lwt_msg;
connect.enable_lwt = true;
lwt_msg.qos = qos;
lwt_msg.retain = retain;
lwt_msg.topic_name = (char*) lw_topic;
lwt_msg.buffer = (byte*) lw_msg;
lwt_msg.total_len = (word16)strlen((char*) lw_msg);
}
*ret = MqttClient_Connect(mqttClient, &connect);
// Check if the request was successfull
if (*ret != 0) {
return NULL;
}
// Check if the request was accepted
switch (connect.ack.return_code) {
case 1:
*ret = MQTT_CODE_INCORRECT_PROTOCOL;
return NULL;
break;
case 2:
case 3:
*ret = MQTT_CODE_CONNECTION_REFUSED;
return NULL;
break;
case 4:
case 5:
*ret = MQTT_CODE_AUTH_FAILED;
return NULL;
break;
};
*ret = 0;
return mqttClient;
MqttClient *mqtt_connect(int fd, CleanCharArray client_id, int keep_alive,
int clean_session, CleanCharArray username,
CleanCharArray password, CleanCharArray lw_topic,
CleanCharArray lw_msg, int qos, int retain, int *ret)
{
MqttCtx *ctx = calloc(1, sizeof(MqttCtx));
if (ctx == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
ctx->fd = fd;
MqttNet *net = calloc(1, sizeof(MqttNet));
if (net == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
net->connect = connect_cb;
net->read = read_cb;
net->write = write_cb;
net->disconnect = disconnect_cb;
net->context = ctx;
byte *txBuf = NULL;
byte *rxBuf = NULL;
txBuf = (byte *)malloc(BUFFER_SIZE);
rxBuf = (byte *)malloc(BUFFER_SIZE);
if (txBuf == NULL || rxBuf == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
MqttClient *mqttClient;
mqttClient = malloc(sizeof(MqttClient));
if (mqttClient == NULL) {
*ret = MQTT_CODE_ERROR_MEMORY;
return NULL;
}
*ret = MqttClient_Init(mqttClient, net, mqtt_message_cb, txBuf,
BUFFER_SIZE, rxBuf, BUFFER_SIZE, 1000);
if (*ret != MQTT_CODE_SUCCESS) {
return NULL;
}
mqttClient->ctx = ctx;
// Connect
MqttConnect connect;
memset(&connect, 0, sizeof(connect));
connect.keep_alive_sec = keep_alive;
connect.client_id = (char *)client_id;
connect.clean_session = clean_session;
connect.username = (char *)username;
connect.password = (char *)password;
if (strlen((char *)lw_topic) > 0) {
MqttMessage lwt_msg;
memset(&lwt_msg, 0, sizeof(lwt_msg));
connect.lwt_msg = &lwt_msg;
connect.enable_lwt = true;
lwt_msg.qos = qos;
lwt_msg.retain = retain;
lwt_msg.topic_name = (char *)lw_topic;
lwt_msg.buffer = (byte *)lw_msg;
lwt_msg.total_len = (word16)strlen((char *)lw_msg);
}
*ret = MqttClient_Connect(mqttClient, &connect);
// Check if the request was successfull
if (*ret != 0) {
return NULL;
}
// Check if the request was accepted
switch (connect.ack.return_code) {
case 1:
*ret = MQTT_CODE_INCORRECT_PROTOCOL;
return NULL;
break;
case 2:
case 3:
*ret = MQTT_CODE_CONNECTION_REFUSED;
return NULL;
break;
case 4:
case 5:
*ret = MQTT_CODE_AUTH_FAILED;
return NULL;
break;
};
*ret = 0;
return mqttClient;
}
MqttClient* mqtt_connect_without_auth (
int fd,
CleanCharArray client_id,
int keep_alive,
int clean_session,
CleanCharArray lw_topic,
CleanCharArray lw_msg,
int qos,
int retain,
int *ret
) {
return mqtt_connect(fd, client_id, keep_alive, clean_session, NULL, NULL, lw_topic, lw_msg, qos, retain, ret);
MqttClient *mqtt_connect_without_auth(int fd, CleanCharArray client_id,
int keep_alive, int clean_session,
CleanCharArray lw_topic,
CleanCharArray lw_msg, int qos,
int retain, int *ret)
{
return mqtt_connect(fd, client_id, keep_alive, clean_session, NULL,
NULL, lw_topic, lw_msg, qos, retain, ret);
}
int mqtt_disconnect(MqttClient* mqttClient)
int mqtt_disconnect(MqttClient *mqttClient)
{
rc = MqttClient_Disconnect(mqttClient);
if (rc != 0) {
return rc;
}
// Clean up
free(mqttClient->tx_buf);
free(mqttClient->rx_buf);
MqttCtx* ctx = (MqttCtx*) mqttClient->ctx;
Msg* msg = ctx->queue_tail;
while (msg != NULL) {
Msg* tmp = msg;
msg = msg->next;
free(msg->topic_buf);
free(msg->payload_buf);
free(tmp);
}
free(ctx);
MqttClient_DeInit(mqttClient);
free(mqttClient->net);
free(mqttClient);
return rc;
rc = MqttClient_Disconnect(mqttClient);
if (rc != 0) {
return rc;
}
// Clean up
free(mqttClient->tx_buf);
free(mqttClient->rx_buf);
MqttCtx *ctx = (MqttCtx *)mqttClient->ctx;
Msg *msg = ctx->queue_tail;
while (msg != NULL) {
Msg *tmp = msg;
msg = msg->next;
free(msg->topic_buf);
free(msg->payload_buf);
free(tmp);
}