1 /** 2 Manages the Discord websocket client. 3 */ 4 module dscord.gateway.client; 5 6 import std.stdio, 7 std.uni, 8 std.functional, 9 std.zlib, 10 std.datetime, 11 std.variant, 12 std.format; 13 14 static import std.typecons; 15 16 import vibe.core.core, 17 vibe.inet.url, 18 vibe.http.websockets; 19 20 import dscord.client, 21 dscord.gateway, 22 dscord.util.emitter, 23 dscord.util.json, 24 dscord.util.counter; 25 26 /** Maximum reconnects the GatewayClient will try before resetting session state */ 27 const ubyte MAX_RECONNECTS = 6; 28 29 /** Current implemented Gateway version. */ 30 const ubyte GATEWAY_VERSION = 6; 31 32 /** 33 GatewayClient is the base abstraction for connecting to, and interacting with 34 the Discord Websocket (gateway) API. 35 */ 36 class GatewayClient { 37 /** Client instance for this gateway connection */ 38 Client client; 39 40 /** WebSocket connection for this gateway connection */ 41 WebSocket sock; 42 43 /** Gateway SessionID, used for resuming. */ 44 string sessionID; 45 46 /** Gateway sequence number, used for resuming */ 47 uint seq; 48 49 /** Heartbeat interval */ 50 uint heartbeatInterval; 51 52 /** Whether this GatewayClient is currently connected */ 53 bool connected; 54 55 /** Number of reconnects attempted */ 56 ubyte reconnects; 57 58 /** The heartbeater task */ 59 Task heartbeater; 60 61 /** Event emitter for Gateway Packets */ 62 Emitter eventEmitter; 63 64 private { 65 /** Cached gateway URL from the API */ 66 string cachedGatewayURL; 67 Counter!string eventCounter; 68 bool eventTracking; 69 } 70 71 /** 72 Params: 73 client = base client 74 eventTracking = if true, log information about events recieved 75 */ 76 this(Client client, bool eventTracking = false) { 77 this.client = client; 78 this.eventTracking = eventTracking; 79 80 // Create the event emitter and listen to some required gateway events. 81 this.eventEmitter = new Emitter; 82 this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent)); 83 this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent)); 84 85 // Copy emitters to client for easier API access 86 client.events = this.eventEmitter; 87 88 if (this.eventTracking) { 89 this.eventCounter = new Counter!string; 90 } 91 } 92 93 /** 94 Logger for this GatewayClient. 95 */ 96 @property Logger log() { 97 return this.client.log; 98 } 99 100 /** 101 Starts a connection to the gateway. Also called for resuming/reconnecting. 102 */ 103 void start() { 104 if (this.sock && this.sock.connected) this.sock.close(); 105 106 // If this is our first connection, get a gateway WS URL 107 if (!this.cachedGatewayURL) { 108 this.cachedGatewayURL = client.api.gatewayGet(); 109 this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json"); 110 } 111 112 // Start the main task 113 this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL); 114 this.sock = connectWebSocket(URL(this.cachedGatewayURL)); 115 runTask(toDelegate(&this.run)); 116 } 117 118 /** 119 Send a gateway payload. 120 */ 121 void send(Serializable p) { 122 string data = p.serialize().toString; 123 version (DEBUG_GATEWAY_DATA) { 124 this.log.tracef("GATEWAY SEND: %s", data); 125 } 126 this.sock.send(data); 127 } 128 129 private void debugEventCounts() { 130 while (true) { 131 this.eventCounter.resetAll(); 132 sleep(5.seconds); 133 this.log.infof("%s total events", this.eventCounter.total); 134 135 foreach (ref event; this.eventCounter.mostCommon(5)) { 136 this.log.infof(" %s: %s", event, this.eventCounter.get(event)); 137 } 138 } 139 } 140 141 private void handleReadyEvent(Ready r) { 142 this.log.infof("Recieved READY payload, starting heartbeater"); 143 // this.hb_interval = r.heartbeatInterval; 144 this.sessionID = r.sessionID; 145 this.reconnects = 0; 146 147 if (this.eventTracking) { 148 runTask(toDelegate(&this.debugEventCounts)); 149 } 150 } 151 152 private void handleResumedEvent(Resumed r) { 153 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 154 } 155 156 private void emitDispatchEvent(T)(VibeJSON obj) { 157 T v = new T(this.client, obj["d"]); 158 this.eventEmitter.emit!T(v); 159 v.resolveDeferreds(); 160 // TODO: determine if we really need to destory things here 161 // v.destroy(); 162 } 163 164 private void handleDispatchPacket(VibeJSON obj, size_t size) { 165 // Update sequence number if it's larger than what we have 166 uint seq = obj["s"].get!uint; 167 if (seq > this.seq) { 168 this.seq = seq; 169 } 170 171 string type = obj["t"].get!string; 172 173 if (this.eventTracking) { 174 this.eventCounter.tick(type); 175 } 176 177 switch (type) { 178 case "READY": 179 this.log.infof("Recieved READY payload, size in bytes: %s", size); 180 this.emitDispatchEvent!Ready(obj); 181 break; 182 case "RESUMED": 183 this.emitDispatchEvent!Resumed(obj); 184 break; 185 case "CHANNEL_CREATE": 186 this.emitDispatchEvent!ChannelCreate(obj); 187 break; 188 case "CHANNEL_UPDATE": 189 this.emitDispatchEvent!ChannelUpdate(obj); 190 break; 191 case "CHANNEL_DELETE": 192 this.emitDispatchEvent!ChannelDelete(obj); 193 break; 194 case "GUILD_BAN_ADD": 195 this.emitDispatchEvent!GuildBanAdd(obj); 196 break; 197 case "GUILD_BAN_REMOVE": 198 this.emitDispatchEvent!GuildBanRemove(obj); 199 break; 200 case "GUILD_CREATE": 201 this.emitDispatchEvent!GuildCreate(obj); 202 break; 203 case "GUILD_UPDATE": 204 this.emitDispatchEvent!GuildUpdate(obj); 205 break; 206 case "GUILD_DELETE": 207 this.emitDispatchEvent!GuildDelete(obj); 208 break; 209 case "GUILD_EMOJIS_UPDATE": 210 this.emitDispatchEvent!GuildEmojisUpdate(obj); 211 break; 212 case "GUILD_INTEGRATIONS_UPDATE": 213 this.emitDispatchEvent!GuildIntegrationsUpdate(obj); 214 break; 215 case "GUILD_MEMBERS_CHUNK": 216 this.emitDispatchEvent!GuildMembersChunk(obj); 217 break; 218 case "GUILD_MEMBER_ADD": 219 this.emitDispatchEvent!GuildMemberAdd(obj); 220 break; 221 case "GUILD_MEMBER_UPDATE": 222 this.emitDispatchEvent!GuildMemberUpdate(obj); 223 break; 224 case "GUILD_MEMBER_REMOVE": 225 this.emitDispatchEvent!GuildMemberRemove(obj); 226 break; 227 case "GUILD_ROLE_CREATE": 228 this.emitDispatchEvent!GuildRoleCreate(obj); 229 break; 230 case "GUILD_ROLE_UPDATE": 231 this.emitDispatchEvent!GuildRoleUpdate(obj); 232 break; 233 case "GUILD_ROLE_DELETE": 234 this.emitDispatchEvent!GuildRoleDelete(obj); 235 break; 236 case "MESSAGE_CREATE": 237 this.emitDispatchEvent!MessageCreate(obj); 238 break; 239 case "MESSAGE_UPDATE": 240 this.emitDispatchEvent!MessageUpdate(obj); 241 break; 242 case "MESSAGE_DELETE": 243 this.emitDispatchEvent!MessageDelete(obj); 244 break; 245 case "PRESENCE_UPDATE": 246 this.emitDispatchEvent!PresenceUpdate(obj); 247 break; 248 case "TYPING_START": 249 this.emitDispatchEvent!TypingStart(obj); 250 break; 251 case "USER_SETTINGS_UPDATE": 252 this.emitDispatchEvent!UserSettingsUpdate(obj); 253 break; 254 case "USER_UPDATE": 255 this.emitDispatchEvent!UserUpdate(obj); 256 break; 257 case "VOICE_STATE_UPDATE": 258 this.emitDispatchEvent!VoiceStateUpdate(obj); 259 break; 260 case "VOICE_SERVER_UPDATE": 261 this.emitDispatchEvent!VoiceServerUpdate(obj); 262 break; 263 case "CHANNEL_PINS_UPDATE": 264 this.emitDispatchEvent!ChannelPinsUpdate(obj); 265 break; 266 case "MESSAGE_DELETE_BULK": 267 this.emitDispatchEvent!MessageDeleteBulk(obj); 268 break; 269 default: 270 this.log.warningf("Unhandled dispatch event: %s", type); 271 break; 272 } 273 } 274 275 private void parse(string rawData) { 276 VibeJSON json = parseJsonString(rawData); 277 278 version (DEBUG_GATEWAY_DATA) { 279 this.log.tracef("GATEWAY RECV: %s", rawData); 280 } 281 282 OPCode op = json["op"].get!OPCode; 283 284 switch (op) { 285 case OPCode.DISPATCH: 286 this.handleDispatchPacket(json, rawData.length); 287 break; 288 case OPCode.HEARTBEAT: 289 this.send(new HeartbeatPacket(this.seq)); 290 break; 291 case OPCode.RECONNECT: 292 this.log.warningf("Recieved RECONNECT OPCode, resetting connection..."); 293 if (this.sock && this.sock.connected) this.sock.close(); 294 break; 295 case OPCode.INVALID_SESSION: 296 this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection..."); 297 if (this.sock && this.sock.connected) this.sock.close(); 298 break; 299 case OPCode.HELLO: 300 this.log.tracef("Recieved HELLO OPCode, starting heartbeatter..."); 301 this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint; 302 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 303 break; 304 case OPCode.HEARTBEAT_ACK: 305 break; 306 default: 307 this.log.warningf("Unhandled gateway packet: %s", op); 308 break; 309 } 310 } 311 312 private void heartbeat() { 313 while (this.connected) { 314 this.send(new HeartbeatPacket(this.seq)); 315 sleep(this.heartbeatInterval.msecs); 316 } 317 } 318 319 /** 320 Runs the GatewayClient until completion. 321 */ 322 void run() { 323 string data; 324 325 // If we already have a sequence number, attempt to resume 326 if (this.sessionID && this.seq) { 327 this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq); 328 this.send(new ResumePacket(this.client.token, this.sessionID, this.seq)); 329 } else { 330 // On startup, send the identify payload 331 this.log.info("Sending Identify Payload"); 332 this.send(new IdentifyPacket( 333 this.client.token, 334 this.client.shardInfo.shard, 335 this.client.shardInfo.numShards)); 336 } 337 338 this.log.info("Connected to Gateway"); 339 this.connected = true; 340 341 while (this.sock.waitForData()) { 342 if (!this.connected) break; 343 344 try { 345 ubyte[] rawdata = this.sock.receiveBinary(); 346 data = cast(string)uncompress(rawdata); 347 } catch (Exception e) { 348 data = this.sock.receiveText(); 349 } 350 351 if (data == "") { 352 continue; 353 } 354 355 try { 356 this.parse(data); 357 } catch (Exception e) { 358 this.log.warningf("failed to handle %s (%s)", e, data); 359 } catch (Error e) { 360 this.log.warningf("failed to handle %s (%s)", e, data); 361 } 362 } 363 364 this.log.critical("Gateway websocket closed"); 365 this.connected = false; 366 this.reconnects++; 367 368 if (this.reconnects > MAX_RECONNECTS) { 369 this.log.errorf("Max Gateway WS reconnects (%s) hit, aborting...", this.reconnects); 370 return; 371 } 372 373 if (this.reconnects > 1) { 374 this.sessionID = null; 375 this.seq = 0; 376 this.log.warning("Waiting 5 seconds before reconnecting..."); 377 sleep(5.seconds); 378 } 379 380 this.log.info("Attempting reconnection..."); 381 return this.start(); 382 } 383 }