Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
mvladic committed Feb 13, 2021
1 parent b6d8717 commit d89503b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 21 deletions.
8 changes: 5 additions & 3 deletions src/eez/modules/mcu/ethernet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,9 @@ void mainLoop(const void *) {
if (event.status == osEventMessage) {
uint8_t eventType = event.value.v & 0xFF;
if (eventType == QUEUE_MESSAGE_PUSH_EVENT) {
mqtt::pushEvent((int16_t)(event.value.v >> 8));
int16_t eventId = event.value.v >> 16;
int8_t channelIndex = (event.value.v >> 8) & 0xFF;
mqtt::pushEvent(eventId, channelIndex);
} else if (eventType == QUEUE_MESSAGE_NTP_STATE_TRANSITION) {
ntp::stateTransition(event.value.v >> 8);
} else {
Expand Down Expand Up @@ -850,9 +852,9 @@ void disconnectClient() {
#endif
}

void pushEvent(int16_t eventId) {
void pushEvent(int16_t eventId, int8_t channelIndex) {
if (!g_shutdownInProgress) {
osMessagePut(g_ethernetMessageQueueId, ((uint32_t)(uint16_t)eventId << 8) | QUEUE_MESSAGE_PUSH_EVENT, 0);
osMessagePut(g_ethernetMessageQueueId, ((uint32_t)(uint16_t)eventId << 16) | ((uint32_t)(uint8_t)channelIndex << 8) | QUEUE_MESSAGE_PUSH_EVENT, channelIndex);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/eez/modules/mcu/ethernet.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void releaseInputBuffer();
int writeBuffer(const char *buffer, uint32_t length);
void disconnectClient();

void pushEvent(int16_t eventId);
void pushEvent(int16_t eventId, int8_t channelIndex);

void ntpStateTransition(int transition);

Expand Down
2 changes: 1 addition & 1 deletion src/eez/modules/psu/event_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void pushEvent(int16_t eventId, int channelIndex) {
addEventToWriteQueue(eventId, nullptr, channelIndex);

#if OPTION_ETHERNET
eez::mcu::ethernet::pushEvent(eventId);
eez::mcu::ethernet::pushEvent(eventId, (int8_t)channelIndex);
#endif

if (getEventType(eventId) == EVENT_TYPE_ERROR) {
Expand Down
41 changes: 26 additions & 15 deletions src/eez/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ static uint32_t g_fanStatusTick;

static const size_t EVENT_QUEUE_SIZE = 10;
struct {
int16_t buffer[EVENT_QUEUE_SIZE];
struct {
int16_t eventId;
int8_t channelIndex;
} buffer[EVENT_QUEUE_SIZE];
int head;
int tail;
bool full;
Expand Down Expand Up @@ -453,12 +456,16 @@ bool publish(const char *pubTopic, float value, bool retain) {
return publish(topic, payload, retain);
}

bool publishEvent(int16_t eventId, bool retain) {
bool publishEvent(int16_t eventId, bool retain, int8_t channelIndex) {
char topic[MAX_PUB_TOPIC_LENGTH + 1];
snprintf(topic, sizeof(topic), PUB_TOPIC_SYSTEM_EVENT, persist_conf::devConf.ethernetHostName);

char format[MAX_PAYLOAD_LENGTH + 1];
snprintf(format, MAX_PAYLOAD_LENGTH, "[%d, \"%s\", \"%s\"]", (int)eventId, event_queue::getEventTypeName(eventId), event_queue::getEventMessage(eventId));
format[MAX_PAYLOAD_LENGTH] = 0;

char payload[MAX_PAYLOAD_LENGTH + 1];
snprintf(payload, MAX_PAYLOAD_LENGTH, "[%d, \"%s\", \"%s\"]", (int)eventId, event_queue::getEventTypeName(eventId), event_queue::getEventMessage(eventId));
snprintf(payload, MAX_PAYLOAD_LENGTH, format, (int)(channelIndex + 1));
payload[MAX_PAYLOAD_LENGTH] = 0;

return publish(topic, payload, retain);
Expand Down Expand Up @@ -538,8 +545,8 @@ const char *getClientId() {
return g_clientId;
}

bool peekEvent(int16_t &eventId);
bool getEvent(int16_t &eventId);
bool peekEvent(int16_t &eventId, int8_t &channelIndex);
bool getEvent(int16_t &eventId, int8_t &channelIndex);

void setState(ConnectionState connectionState) {
if (connectionState == CONNECTION_STATE_CONNECTED) {
Expand Down Expand Up @@ -621,9 +628,10 @@ void tick() {

// publish events from event view
int16_t eventId;
if (peekEvent(eventId)) {
if (publishEvent(eventId, true)) {
getEvent(eventId);
int8_t channelIndex;
if (peekEvent(eventId, channelIndex)) {
if (publishEvent(eventId, true, channelIndex)) {
getEvent(eventId, channelIndex);
if (g_publishing) {
return;
}
Expand Down Expand Up @@ -969,12 +977,13 @@ void reconnect() {
}
}

void pushEvent(int16_t eventId) {
if (g_connectionState == CONNECTION_STATE_CONNECTED && publishEvent(eventId, true)) {
void pushEvent(int16_t eventId, int8_t channelIndex) {
if (g_connectionState == CONNECTION_STATE_CONNECTED && publishEvent(eventId, true, channelIndex)) {
return;
}

g_eventQueue.buffer[g_eventQueue.head] = eventId;
g_eventQueue.buffer[g_eventQueue.head].eventId = eventId;
g_eventQueue.buffer[g_eventQueue.head].channelIndex = channelIndex;

// advance
if (g_eventQueue.full) {
Expand All @@ -984,18 +993,20 @@ void pushEvent(int16_t eventId) {
g_eventQueue.full = g_eventQueue.head == g_eventQueue.tail;
}

bool peekEvent(int16_t &eventId) {
bool peekEvent(int16_t &eventId, int8_t &channelIndex) {
if (g_eventQueue.full || g_eventQueue.tail != g_eventQueue.head) {
eventId = g_eventQueue.buffer[g_eventQueue.tail];
eventId = g_eventQueue.buffer[g_eventQueue.tail].eventId;
channelIndex = g_eventQueue.buffer[g_eventQueue.tail].channelIndex;
return true;
}

return false;
}

bool getEvent(int16_t &eventId) {
bool getEvent(int16_t &eventId, int8_t &channelIndex) {
if (g_eventQueue.full || g_eventQueue.tail != g_eventQueue.head) {
eventId = g_eventQueue.buffer[g_eventQueue.tail];
eventId = g_eventQueue.buffer[g_eventQueue.tail].eventId;
channelIndex = g_eventQueue.buffer[g_eventQueue.tail].channelIndex;
g_eventQueue.tail = (g_eventQueue.tail + 1) % EVENT_QUEUE_SIZE;
g_eventQueue.full = false;
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/eez/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern ConnectionState g_connectionState;

void tick();
void reconnect();
void pushEvent(int16_t eventId);
void pushEvent(int16_t eventId, int8_t channelIndex);

} // mqtt
} // eez

0 comments on commit d89503b

Please sign in to comment.