1 module dscord.gateway.client; 2 3 import std.stdio, 4 std.uni, 5 std.functional, 6 std.zlib, 7 std.datetime, 8 std.variant; 9 10 import vibe.core.core, 11 vibe.inet.url, 12 vibe.http.websockets; 13 14 import dscord.client, 15 dscord.gateway.packets, 16 dscord.gateway.events, 17 dscord.util.emitter, 18 dscord.util.json; 19 20 /** Maximum reconnects the GatewayClient will try before resetting session state */ 21 const ubyte MAX_RECONNECTS = 6; 22 23 /** 24 GatewayClient is the base abstraction for connecting to, and interacting with 25 the Discord Websocket (gateway) API. 26 */ 27 class GatewayClient { 28 /** Client instance for this gateway connection */ 29 Client client; 30 31 /** WebSocket connection for this gateway connection */ 32 WebSocket sock; 33 34 /** Gateway SessionID, used for resuming. */ 35 string sessionID; 36 37 /** Gateway sequence number, used for resuming */ 38 uint seq; 39 40 /** Heartbeat interval */ 41 uint hb_interval; 42 43 /** Whether this GatewayClient is currently connected */ 44 bool connected; 45 46 /** Number of reconnects attempted */ 47 ubyte reconnects; 48 49 /** The heartbeater task */ 50 Task heartbeater; 51 52 /** Event emitter for Gateway Packets */ 53 Emitter eventEmitter; 54 55 private { 56 /** Cached gateway URL from the API */ 57 string cachedGatewayURL; 58 } 59 60 this(Client client) { 61 this.client = client; 62 63 // Create the event emitter and listen to some required gateway events. 64 this.eventEmitter = new Emitter; 65 this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent)); 66 this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent)); 67 68 // Copy emitters to client for easier API access 69 client.events = this.eventEmitter; 70 } 71 72 /** 73 Logger for this GatewayClient. 74 */ 75 @property Logger log() { 76 return this.client.log; 77 } 78 79 /** 80 Starts a connection to the gateway. Also called for resuming/reconnecting. 81 */ 82 void start() { 83 if (this.sock && this.sock.connected) this.sock.close(); 84 85 // If this is our first connection, get a gateway WS URL 86 if (!this.cachedGatewayURL) { 87 this.cachedGatewayURL = client.api.gateway(); 88 } 89 90 // Start the main task 91 this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL); 92 this.sock = connectWebSocket(URL(this.cachedGatewayURL)); 93 runTask(toDelegate(&this.run)); 94 } 95 96 /** 97 Send a gateway payload. 98 */ 99 void send(Serializable p) { 100 JSONValue data = p.serialize(); 101 this.log.tracef("gateway-send: %s", data.toString); 102 this.sock.send(data.toString); 103 } 104 105 private void handleReadyEvent(Ready r) { 106 this.log.infof("Recieved READY payload, starting heartbeater"); 107 this.hb_interval = r.heartbeatInterval; 108 this.sessionID = r.sessionID; 109 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 110 this.reconnects = 0; 111 } 112 113 private void handleResumedEvent(Resumed r) { 114 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 115 } 116 117 private void emitDispatchEvent(T)(ref JSON obj) { 118 T v = new T(this.client, obj); 119 this.eventEmitter.emit!T(v); 120 v.resolveDeferreds(); 121 v.destroy(); 122 } 123 124 private void handleDispatchPacket(uint seq, string type, ref JSON obj) { 125 // Update sequence number if it's larger than what we have 126 if (seq > this.seq) { 127 this.seq = seq; 128 } 129 130 switch (type) { 131 case "READY": 132 this.emitDispatchEvent!Ready(obj); 133 break; 134 case "RESUMED": 135 this.emitDispatchEvent!Resumed(obj); 136 break; 137 case "CHANNEL_CREATE": 138 this.emitDispatchEvent!ChannelCreate(obj); 139 break; 140 case "CHANNEL_UPDATE": 141 this.emitDispatchEvent!ChannelUpdate(obj); 142 break; 143 case "CHANNEL_DELETE": 144 this.emitDispatchEvent!ChannelDelete(obj); 145 break; 146 case "GUILD_BAN_ADD": 147 this.emitDispatchEvent!GuildBanAdd(obj); 148 break; 149 case "GUILD_BAN_REMOVE": 150 this.emitDispatchEvent!GuildBanRemove(obj); 151 break; 152 case "GUILD_CREATE": 153 this.emitDispatchEvent!GuildCreate(obj); 154 break; 155 case "GUILD_UPDATE": 156 this.emitDispatchEvent!GuildUpdate(obj); 157 break; 158 case "GUILD_DELETE": 159 this.emitDispatchEvent!GuildDelete(obj); 160 break; 161 case "GUILD_EMOJIS_UPDATE": 162 this.emitDispatchEvent!GuildEmojisUpdate(obj); 163 break; 164 case "GUILD_INTEGRATIONS_UPDATE": 165 this.emitDispatchEvent!GuildIntegrationsUpdate(obj); 166 break; 167 case "GUILD_MEMBER_ADD": 168 this.emitDispatchEvent!GuildMemberAdd(obj); 169 break; 170 case "GUILD_MEMBER_UPDATE": 171 this.emitDispatchEvent!GuildMemberUpdate(obj); 172 break; 173 case "GUILD_MEMBER_REMOVE": 174 this.emitDispatchEvent!GuildMemberRemove(obj); 175 break; 176 case "GUILD_ROLE_CREATE": 177 this.emitDispatchEvent!GuildRoleCreate(obj); 178 break; 179 case "GUILD_ROLE_UPDATE": 180 this.emitDispatchEvent!GuildRoleUpdate(obj); 181 break; 182 case "GUILD_ROLE_DELETE": 183 this.emitDispatchEvent!GuildRoleDelete(obj); 184 break; 185 case "MESSAGE_CREATE": 186 this.emitDispatchEvent!MessageCreate(obj); 187 break; 188 case "MESSAGE_UPDATE": 189 this.emitDispatchEvent!MessageUpdate(obj); 190 break; 191 case "MESSAGE_DELETE": 192 this.emitDispatchEvent!MessageDelete(obj); 193 break; 194 case "PRESENCE_UPDATE": 195 this.emitDispatchEvent!PresenceUpdate(obj); 196 break; 197 case "TYPING_START": 198 this.emitDispatchEvent!TypingStart(obj); 199 break; 200 case "USER_SETTINGS_UPDATE": 201 this.emitDispatchEvent!UserSettingsUpdate(obj); 202 break; 203 case "USER_UPDATE": 204 this.emitDispatchEvent!UserUpdate(obj); 205 break; 206 case "VOICE_STATE_UPDATE": 207 this.emitDispatchEvent!VoiceStateUpdate(obj); 208 break; 209 case "VOICE_SERVER_UPDATE": 210 this.emitDispatchEvent!VoiceServerUpdate(obj); 211 break; 212 default: 213 this.log.warningf("Unhandled dispatch event: %s", type); 214 break; 215 } 216 } 217 218 private void parse(string rawData) { 219 auto json = parseTrustedJSON(rawData); 220 221 uint seq; 222 string type; 223 OPCode op; 224 225 // Scan over each key, store any extra information until we hit the data payload 226 foreach (key; json.byKey) { 227 switch (key) { 228 case "op": 229 op = cast(OPCode)json.read!ushort; 230 break; 231 case "t": 232 type = json.read!string; 233 break; 234 case "s": 235 seq = json.read!uint; 236 break; 237 case "d": 238 switch (op) { 239 case OPCode.DISPATCH: 240 this.handleDispatchPacket(seq, type, json); 241 break; 242 default: 243 this.log.warningf("Unhandled gateway packet: %s", op); 244 break; 245 } 246 break; 247 default: 248 this.log.tracef("K: %s", key); 249 break; 250 } 251 } 252 } 253 254 private void heartbeat() { 255 while (this.connected) { 256 this.send(new HeartbeatPacket(this.seq)); 257 sleep(this.hb_interval.msecs); 258 } 259 } 260 261 /** 262 Runs the GatewayClient until completion. 263 */ 264 void run() { 265 string data; 266 267 // If we already have a sequence number, attempt to resume 268 if (this.sessionID && this.seq) { 269 this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq); 270 this.send(new ResumePacket(this.client.token, this.sessionID, this.seq)); 271 } else { 272 // On startup, send the identify payload 273 this.log.info("Sending Identify Payload"); 274 this.send(new IdentifyPacket(this.client.token)); 275 } 276 277 this.log.info("Connected to Gateway"); 278 this.connected = true; 279 280 while (this.sock.waitForData()) { 281 if (!this.connected) break; 282 283 try { 284 ubyte[] rawdata = this.sock.receiveBinary(); 285 data = cast(string)uncompress(rawdata); 286 } catch (Exception e) { 287 data = this.sock.receiveText(); 288 } 289 290 if (data == "") { 291 continue; 292 } 293 294 try { 295 this.parse(data); 296 } catch (Exception e) { 297 this.log.warningf("failed to handle %s (%s)", e, data); 298 } catch (Error e) { 299 this.log.warningf("failed to handle %s (%s)", e, data); 300 } 301 } 302 303 this.log.critical("Gateway websocket closed"); 304 this.connected = false; 305 this.reconnects++; 306 307 if (this.reconnects > MAX_RECONNECTS) { 308 this.log.errorf("Max Gateway WS reconnects (%s) hit, aborting...", this.reconnects); 309 return; 310 } 311 312 if (this.reconnects > 1) { 313 this.sessionID = null; 314 this.seq = 0; 315 this.log.warning("Waiting 5 seconds before reconnecting..."); 316 sleep(5.seconds); 317 } 318 319 this.log.info("Attempting reconnection..."); 320 return this.start(); 321 } 322 }