|
| 1 | +/* Multi-client Server Sent Event (aka EventSource) demo |
| 2 | + Run demo as follows: |
| 3 | + 1. set SSID, password and ports, compile and run program |
| 4 | + you should see (random) updates of sensors A and B |
| 5 | +
|
| 6 | + 2. on the client(s), register it for the event bus using a REST API call: curl -sS "http://<your ESP IP>:<your port>/rest/events/subscribe" |
| 7 | + on both server and client, you should now see that your client is registered |
| 8 | + the server sends back the location of the event bus (channel) to the client: |
| 9 | + subscription for client IP <your client's IP address>: event bus location: http://<your ESP IP>:<your port>/rest/events/<channel> |
| 10 | +
|
| 11 | + you will also see that the sensors are ready to broadcast state changes, but the client is not yet listening: |
| 12 | + SSEBroadcastState - client <your client IP>> registered but not listening |
| 13 | +
|
| 14 | + 3. on the client(s), start listening for events with: curl -sS "http://<your ESP IP>:<your port>/rest/events/<channel>" |
| 15 | + if all is well, the following is being displayed on the ESP console |
| 16 | + SSEHandler - registered client with IP <your client IP address> is listening... |
| 17 | + broadcast status change to client IP <your client IP>> for sensor[A|B] with new state <some number>> |
| 18 | + every minute you will see on the ESP: SSEKeepAlive - client is still connected |
| 19 | +
|
| 20 | + on the client, you should see the SSE messages coming in: |
| 21 | + event: event |
| 22 | + data: { "TYPE":"KEEP-ALIVE" } |
| 23 | + event: event |
| 24 | + data: { "TYPE":"STATE", "sensorB": {"state" : 12408, "prevState": 13502} } |
| 25 | + event: event |
| 26 | + data: { "TYPE":"STATE", "sensorA": {"state" : 17664, "prevState": 49362} } |
| 27 | +
|
| 28 | + 4. on the client, stop listening by hitting control-C |
| 29 | + on the ESP, after maximum one minute, the following message is displayed: SSEKeepAlive - client no longer connected, remove subscription |
| 30 | + if you start listening again after the time expired, the "/rest/events" handle becomes stale and "Handle not found" is returned |
| 31 | + you can also try to start listening again before the KeepAliver timer expires or simply register your client again |
| 32 | +*/ |
| 33 | + |
| 34 | +extern "C" { |
| 35 | +#include "c_types.h" |
| 36 | +} |
| 37 | +#include <ESP8266WiFi.h> |
| 38 | +#include <WiFiClient.h> |
| 39 | +#include <ESP8266WebServer.h> |
| 40 | +#include <ESP8266mDNS.h> |
| 41 | +#include <Ticker.h> |
| 42 | + |
| 43 | +#ifndef STASSID |
| 44 | +#define STASSID "your-ssid" |
| 45 | +#define STAPSK "your-password" |
| 46 | +#endif |
| 47 | + |
| 48 | +const char* ssid = STASSID; |
| 49 | +const char* password = STAPSK; |
| 50 | +const unsigned int port = 80; |
| 51 | + |
| 52 | +ESP8266WebServer server(port); |
| 53 | + |
| 54 | +#define SSE_MAX_CHANNELS 8 // in this simplified example, only eight SSE clients subscription allowed |
| 55 | +struct SSESubscription { |
| 56 | + IPAddress clientIP; |
| 57 | + WiFiClient client; |
| 58 | + Ticker keepAliveTimer; |
| 59 | +} subscription[SSE_MAX_CHANNELS]; |
| 60 | +uint8_t subscriptionCount = 0; |
| 61 | + |
| 62 | +typedef struct { |
| 63 | + const char *name; |
| 64 | + unsigned short value; |
| 65 | + Ticker update; |
| 66 | +} sensorType; |
| 67 | +sensorType sensor[2]; |
| 68 | + |
| 69 | +void handleNotFound() { |
| 70 | + Serial.println(F("Handle not found")); |
| 71 | + String message = "Handle Not Found\n\n"; |
| 72 | + message += "URI: "; |
| 73 | + message += server.uri(); |
| 74 | + message += "\nMethod: "; |
| 75 | + message += (server.method() == HTTP_GET) ? "GET" : "POST"; |
| 76 | + message += "\nArguments: "; |
| 77 | + message += server.args(); |
| 78 | + message += "\n"; |
| 79 | + for (uint8_t i = 0; i < server.args(); i++) { |
| 80 | + message += " " + server.argName(i) + ": " + server.arg(i) + "\n"; |
| 81 | + } |
| 82 | + server.send(404, "text/plain", message); |
| 83 | +} |
| 84 | + |
| 85 | +void SSEKeepAlive() { |
| 86 | + for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) { |
| 87 | + if (!(subscription[i].clientIP)) { |
| 88 | + continue; |
| 89 | + } |
| 90 | + if (subscription[i].client.connected()) { |
| 91 | + Serial.printf_P(PSTR("SSEKeepAlive - client is still listening on channel %d\n"), i); |
| 92 | + subscription[i].client.println(F("event: event\ndata: { \"TYPE\":\"KEEP-ALIVE\" }\n")); // Extra newline required by SSE standard |
| 93 | + } else { |
| 94 | + Serial.printf_P(PSTR("SSEKeepAlive - client not listening on channel %d, remove subscription\n"), i); |
| 95 | + subscription[i].keepAliveTimer.detach(); |
| 96 | + subscription[i].client.flush(); |
| 97 | + subscription[i].client.stop(); |
| 98 | + subscription[i].clientIP = INADDR_NONE; |
| 99 | + subscriptionCount--; |
| 100 | + } |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +// SSEHandler handles the client connection to the event bus (client event listener) |
| 105 | +// every 60 seconds it sends a keep alive event via Ticker |
| 106 | +void SSEHandler(uint8_t channel) { |
| 107 | + WiFiClient client = server.client(); |
| 108 | + SSESubscription &s = subscription[channel]; |
| 109 | + if (s.clientIP != client.remoteIP()) { // IP addresses don't match, reject this client |
| 110 | + Serial.printf_P(PSTR("SSEHandler - unregistered client with IP %s tries to listen\n"), server.client().remoteIP().toString().c_str()); |
| 111 | + return handleNotFound(); |
| 112 | + } |
| 113 | + client.setNoDelay(true); |
| 114 | + client.setSync(true); |
| 115 | + Serial.printf_P(PSTR("SSEHandler - registered client with IP %s is listening\n"), IPAddress(s.clientIP).toString().c_str()); |
| 116 | + s.client = client; // capture SSE server client connection |
| 117 | + server.setContentLength(CONTENT_LENGTH_UNKNOWN); // the payload can go on forever |
| 118 | + server.sendContent_P(PSTR("HTTP/1.1 200 OK\nContent-Type: text/event-stream;\nConnection: keep-alive\nCache-Control: no-cache\nAccess-Control-Allow-Origin: *\n\n")); |
| 119 | + s.keepAliveTimer.attach_scheduled(30.0, SSEKeepAlive); // Refresh time every 30s for demo |
| 120 | +} |
| 121 | + |
| 122 | +void handleAll() { |
| 123 | + const char *uri = server.uri().c_str(); |
| 124 | + const char *restEvents = PSTR("/rest/events/"); |
| 125 | + if (strncmp_P(uri, restEvents, strlen_P(restEvents))) { |
| 126 | + return handleNotFound(); |
| 127 | + } |
| 128 | + uri += strlen_P(restEvents); // Skip the "/rest/events/" and get to the channel number |
| 129 | + unsigned int channel = atoi(uri); |
| 130 | + if (channel < SSE_MAX_CHANNELS) { |
| 131 | + return SSEHandler(channel); |
| 132 | + } |
| 133 | + handleNotFound(); |
| 134 | +}; |
| 135 | + |
| 136 | +void SSEBroadcastState(const char *sensorName, unsigned short prevSensorValue, unsigned short sensorValue) { |
| 137 | + for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) { |
| 138 | + if (!(subscription[i].clientIP)) { |
| 139 | + continue; |
| 140 | + } |
| 141 | + String IPaddrstr = IPAddress(subscription[i].clientIP).toString(); |
| 142 | + if (subscription[i].client.connected()) { |
| 143 | + Serial.printf_P(PSTR("broadcast status change to client IP %s on channel %d for %s with new state %d\n"), |
| 144 | + IPaddrstr.c_str(), i, sensorName, sensorValue); |
| 145 | + subscription[i].client.printf_P(PSTR("event: event\ndata: {\"TYPE\":\"STATE\", \"%s\":{\"state\":%d, \"prevState\":%d}}\n\n"), |
| 146 | + sensorName, sensorValue, prevSensorValue); |
| 147 | + } else { |
| 148 | + Serial.printf_P(PSTR("SSEBroadcastState - client %s registered on channel %d but not listening\n"), IPaddrstr.c_str(), i); |
| 149 | + } |
| 150 | + } |
| 151 | +} |
| 152 | + |
| 153 | +// Simulate sensors |
| 154 | +void updateSensor(sensorType &sensor) { |
| 155 | + unsigned short newVal = (unsigned short)RANDOM_REG32; // (not so good) random value for the sensor |
| 156 | + Serial.printf_P(PSTR("update sensor %s - previous state: %d, new state: %d\n"), sensor.name, sensor.value, newVal); |
| 157 | + if (sensor.value != newVal) { |
| 158 | + SSEBroadcastState(sensor.name, sensor.value, newVal); // only broadcast if state is different |
| 159 | + } |
| 160 | + sensor.value = newVal; |
| 161 | + sensor.update.once(rand() % 20 + 10, std::bind(updateSensor, sensor)); // randomly update sensor |
| 162 | +} |
| 163 | + |
| 164 | +void handleSubscribe() { |
| 165 | + if (subscriptionCount == SSE_MAX_CHANNELS - 1) { |
| 166 | + return handleNotFound(); // We ran out of channels |
| 167 | + } |
| 168 | + |
| 169 | + uint8_t channel; |
| 170 | + IPAddress clientIP = server.client().remoteIP(); // get IP address of client |
| 171 | + String SSEurl = F("http://"); |
| 172 | + SSEurl += WiFi.localIP().toString(); |
| 173 | + SSEurl += F(":"); |
| 174 | + SSEurl += port; |
| 175 | + size_t offset = SSEurl.length(); |
| 176 | + SSEurl += F("/rest/events/"); |
| 177 | + |
| 178 | + ++subscriptionCount; |
| 179 | + for (channel = 0; channel < SSE_MAX_CHANNELS; channel++) // Find first free slot |
| 180 | + if (!subscription[channel].clientIP) { |
| 181 | + break; |
| 182 | + } |
| 183 | + subscription[channel] = {clientIP, server.client(), Ticker()}; |
| 184 | + SSEurl += channel; |
| 185 | + Serial.printf_P(PSTR("Allocated channel %d, on uri %s\n"), channel, SSEurl.substring(offset).c_str()); |
| 186 | + //server.on(SSEurl.substring(offset), std::bind(SSEHandler, &(subscription[channel]))); |
| 187 | + Serial.printf_P(PSTR("subscription for client IP %s: event bus location: %s\n"), clientIP.toString().c_str(), SSEurl.c_str()); |
| 188 | + server.send_P(200, "text/plain", SSEurl.c_str()); |
| 189 | +} |
| 190 | + |
| 191 | +void startServers() { |
| 192 | + server.on(F("/rest/events/subscribe"), handleSubscribe); |
| 193 | + server.onNotFound(handleAll); |
| 194 | + server.begin(); |
| 195 | + Serial.println("HTTP server and SSE EventSource started"); |
| 196 | +} |
| 197 | + |
| 198 | +void setup(void) { |
| 199 | + Serial.begin(115200); |
| 200 | + WiFi.mode(WIFI_STA); |
| 201 | + WiFi.begin(ssid, password); |
| 202 | + Serial.println(""); |
| 203 | + while (WiFi.status() != WL_CONNECTED) { // Wait for connection |
| 204 | + delay(500); |
| 205 | + Serial.print("."); |
| 206 | + } |
| 207 | + Serial.printf_P(PSTR("\nConnected to %s with IP address: %s\n"), ssid, WiFi.localIP().toString().c_str()); |
| 208 | + if (MDNS.begin("esp8266")) { |
| 209 | + Serial.println("MDNS responder started"); |
| 210 | + } |
| 211 | + |
| 212 | + startServers(); // start web and SSE servers |
| 213 | + sensor[0].name = "sensorA"; |
| 214 | + sensor[1].name = "sensorB"; |
| 215 | + updateSensor(sensor[0]); |
| 216 | + updateSensor(sensor[1]); |
| 217 | +} |
| 218 | + |
| 219 | +void loop(void) { |
| 220 | + server.handleClient(); |
| 221 | + MDNS.update(); |
| 222 | + yield(); |
| 223 | +} |
0 commit comments