1 /**
2   Manages the Discord websocket client.
3 */
4 module dscord.gateway.client;
5 
6 import std.stdio,
7        std.uni,
8        std.functional,
9        std.zlib,
10        std.datetime,
11        std.variant,
12        std.format;
13 
14 static import std.typecons;
15 
16 import vibe.core.core,
17        vibe.inet.url,
18        vibe.http.websockets;
19 
20 import dscord.client,
21        dscord.gateway,
22        dscord.util.emitter,
23        dscord.util.json,
24        dscord.util.counter;
25 
26 /** Maximum reconnects the GatewayClient will try before resetting session state */
27 const ubyte MAX_RECONNECTS = 6;
28 
29 /** Current implemented Gateway version. */
30 const ubyte GATEWAY_VERSION = 6;
31 
32 /**
33   GatewayClient is the base abstraction for connecting to, and interacting with
34   the Discord Websocket (gateway) API.
35 */
36 class GatewayClient {
37   /** Client instance for this gateway connection */
38   Client     client;
39 
40   /** WebSocket connection for this gateway connection */
41   WebSocket  sock;
42 
43   /** Gateway SessionID, used for resuming. */
44   string  sessionID;
45 
46   /** Gateway sequence number, used for resuming */
47   uint    seq;
48 
49   /** Heartbeat interval */
50   uint    heartbeatInterval;
51 
52   /** Whether this GatewayClient is currently connected */
53   bool    connected;
54 
55   /** Number of reconnects attempted */
56   ubyte   reconnects;
57 
58   /** The heartbeater task */
59   Task    heartbeater;
60 
61   /** Event emitter for Gateway Packets */
62   Emitter  eventEmitter;
63 
64   private {
65     /** Cached gateway URL from the API */
66     string  cachedGatewayURL;
67     Counter!string eventCounter;
68     bool eventTracking;
69   }
70 
71   /**
72     Params:
73       client = base client
74       eventTracking = if true, log information about events recieved
75   */
76   this(Client client, bool eventTracking = false) {
77     this.client = client;
78     this.eventTracking = eventTracking;
79 
80     // Create the event emitter and listen to some required gateway events.
81     this.eventEmitter = new Emitter;
82     this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent));
83     this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent));
84 
85     // Copy emitters to client for easier API access
86     client.events = this.eventEmitter;
87 
88     if (this.eventTracking) {
89       this.eventCounter = new Counter!string;
90     }
91   }
92 
93   /**
94     Logger for this GatewayClient.
95   */
96   @property Logger log() {
97     return this.client.log;
98   }
99 
100   /**
101     Starts a connection to the gateway. Also called for resuming/reconnecting.
102   */
103   void start() {
104     if (this.sock && this.sock.connected) this.sock.close();
105 
106     // If this is our first connection, get a gateway WS URL
107     if (!this.cachedGatewayURL) {
108       this.cachedGatewayURL = client.api.gatewayGet();
109       this.cachedGatewayURL ~= format("/?v=%s&encoding=%s", GATEWAY_VERSION, "json");
110     }
111 
112     // Start the main task
113     this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL);
114     this.sock = connectWebSocket(URL(this.cachedGatewayURL));
115     runTask(toDelegate(&this.run));
116   }
117 
118   /**
119     Send a gateway payload.
120   */
121   void send(Serializable p) {
122     string data = p.serialize().toString;
123     version (DEBUG_GATEWAY_DATA) {
124       this.log.tracef("GATEWAY SEND: %s", data);
125     }
126     this.sock.send(data);
127   }
128 
129   private void debugEventCounts() {
130     while (true) {
131       this.eventCounter.resetAll();
132       sleep(5.seconds);
133       this.log.infof("%s total events", this.eventCounter.total);
134 
135       foreach (ref event; this.eventCounter.mostCommon(5)) {
136         this.log.infof("  %s: %s", event, this.eventCounter.get(event));
137       }
138     }
139   }
140 
141   private void handleReadyEvent(Ready  r) {
142     this.log.infof("Recieved READY payload, starting heartbeater");
143     // this.hb_interval = r.heartbeatInterval;
144     this.sessionID = r.sessionID;
145     this.reconnects = 0;
146 
147     if (this.eventTracking) {
148       runTask(toDelegate(&this.debugEventCounts));
149     }
150   }
151 
152   private void handleResumedEvent(Resumed r) {
153     this.heartbeater = runTask(toDelegate(&this.heartbeat));
154   }
155 
156   private void emitDispatchEvent(T)(VibeJSON obj) {
157     T v = new T(this.client, obj["d"]);
158     this.eventEmitter.emit!T(v);
159     v.resolveDeferreds();
160     // TODO: determine if we really need to destory things here
161     // v.destroy();
162   }
163 
164   private void handleDispatchPacket(VibeJSON obj, size_t size) {
165     // Update sequence number if it's larger than what we have
166     uint seq = obj["s"].get!uint;
167     if (seq > this.seq) {
168       this.seq = seq;
169     }
170 
171     string type = obj["t"].get!string;
172 
173     if (this.eventTracking) {
174       this.eventCounter.tick(type);
175     }
176 
177     switch (type) {
178       case "READY":
179         this.log.infof("Recieved READY payload, size in bytes: %s", size);
180         this.emitDispatchEvent!Ready(obj);
181         break;
182       case "RESUMED":
183         this.emitDispatchEvent!Resumed(obj);
184         break;
185       case "CHANNEL_CREATE":
186         this.emitDispatchEvent!ChannelCreate(obj);
187         break;
188       case "CHANNEL_UPDATE":
189         this.emitDispatchEvent!ChannelUpdate(obj);
190         break;
191       case "CHANNEL_DELETE":
192         this.emitDispatchEvent!ChannelDelete(obj);
193         break;
194       case "GUILD_BAN_ADD":
195         this.emitDispatchEvent!GuildBanAdd(obj);
196         break;
197       case "GUILD_BAN_REMOVE":
198         this.emitDispatchEvent!GuildBanRemove(obj);
199         break;
200       case "GUILD_CREATE":
201         this.emitDispatchEvent!GuildCreate(obj);
202         break;
203       case "GUILD_UPDATE":
204         this.emitDispatchEvent!GuildUpdate(obj);
205         break;
206       case "GUILD_DELETE":
207         this.emitDispatchEvent!GuildDelete(obj);
208         break;
209       case "GUILD_EMOJIS_UPDATE":
210         this.emitDispatchEvent!GuildEmojisUpdate(obj);
211         break;
212       case "GUILD_INTEGRATIONS_UPDATE":
213         this.emitDispatchEvent!GuildIntegrationsUpdate(obj);
214         break;
215       case "GUILD_MEMBERS_CHUNK":
216         this.emitDispatchEvent!GuildMembersChunk(obj);
217         break;
218       case "GUILD_MEMBER_ADD":
219         this.emitDispatchEvent!GuildMemberAdd(obj);
220         break;
221       case "GUILD_MEMBER_UPDATE":
222         this.emitDispatchEvent!GuildMemberUpdate(obj);
223         break;
224       case "GUILD_MEMBER_REMOVE":
225         this.emitDispatchEvent!GuildMemberRemove(obj);
226         break;
227       case "GUILD_ROLE_CREATE":
228         this.emitDispatchEvent!GuildRoleCreate(obj);
229         break;
230       case "GUILD_ROLE_UPDATE":
231         this.emitDispatchEvent!GuildRoleUpdate(obj);
232         break;
233       case "GUILD_ROLE_DELETE":
234         this.emitDispatchEvent!GuildRoleDelete(obj);
235         break;
236       case "MESSAGE_CREATE":
237         this.emitDispatchEvent!MessageCreate(obj);
238         break;
239       case "MESSAGE_UPDATE":
240         this.emitDispatchEvent!MessageUpdate(obj);
241         break;
242       case "MESSAGE_DELETE":
243         this.emitDispatchEvent!MessageDelete(obj);
244         break;
245       case "PRESENCE_UPDATE":
246         this.emitDispatchEvent!PresenceUpdate(obj);
247         break;
248       case "TYPING_START":
249         this.emitDispatchEvent!TypingStart(obj);
250         break;
251       case "USER_SETTINGS_UPDATE":
252         this.emitDispatchEvent!UserSettingsUpdate(obj);
253         break;
254       case "USER_UPDATE":
255         this.emitDispatchEvent!UserUpdate(obj);
256         break;
257       case "VOICE_STATE_UPDATE":
258         this.emitDispatchEvent!VoiceStateUpdate(obj);
259         break;
260       case "VOICE_SERVER_UPDATE":
261         this.emitDispatchEvent!VoiceServerUpdate(obj);
262         break;
263       case "CHANNEL_PINS_UPDATE":
264         this.emitDispatchEvent!ChannelPinsUpdate(obj);
265         break;
266       case "MESSAGE_DELETE_BULK":
267         this.emitDispatchEvent!MessageDeleteBulk(obj);
268         break;
269       default:
270         this.log.warningf("Unhandled dispatch event: %s", type);
271         break;
272     }
273   }
274 
275   private void parse(string rawData) {
276     VibeJSON json = parseJsonString(rawData);
277 
278     version (DEBUG_GATEWAY_DATA) {
279       this.log.tracef("GATEWAY RECV: %s", rawData);
280     }
281 
282     OPCode op = json["op"].get!OPCode;
283 
284     switch (op) {
285       case OPCode.DISPATCH:
286         this.handleDispatchPacket(json, rawData.length);
287         break;
288       case OPCode.HEARTBEAT:
289         this.send(new HeartbeatPacket(this.seq));
290         break;
291       case OPCode.RECONNECT:
292         this.log.warningf("Recieved RECONNECT OPCode, resetting connection...");
293         if (this.sock && this.sock.connected) this.sock.close();
294         break;
295       case OPCode.INVALID_SESSION:
296         this.log.warningf("Recieved INVALID_SESSION OPCode, resetting connection...");
297         if (this.sock && this.sock.connected) this.sock.close();
298         break;
299       case OPCode.HELLO:
300         this.log.tracef("Recieved HELLO OPCode, starting heartbeatter...");
301         this.heartbeatInterval = json["d"]["heartbeat_interval"].get!uint;
302         this.heartbeater = runTask(toDelegate(&this.heartbeat));
303         break;
304       case OPCode.HEARTBEAT_ACK:
305         break;
306       default:
307         this.log.warningf("Unhandled gateway packet: %s", op);
308         break;
309     }
310   }
311 
312   private void heartbeat() {
313     while (this.connected) {
314       this.send(new HeartbeatPacket(this.seq));
315       sleep(this.heartbeatInterval.msecs);
316     }
317   }
318 
319   /**
320     Runs the GatewayClient until completion.
321   */
322   void run() {
323     string data;
324 
325     // If we already have a sequence number, attempt to resume
326     if (this.sessionID && this.seq) {
327       this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq);
328       this.send(new ResumePacket(this.client.token, this.sessionID, this.seq));
329     } else {
330       // On startup, send the identify payload
331       this.log.info("Sending Identify Payload");
332       this.send(new IdentifyPacket(
333           this.client.token,
334           this.client.shardInfo.shard,
335           this.client.shardInfo.numShards));
336     }
337 
338     this.log.info("Connected to Gateway");
339     this.connected = true;
340 
341     while (this.sock.waitForData()) {
342       if (!this.connected) break;
343 
344       try {
345         ubyte[] rawdata = this.sock.receiveBinary();
346         data = cast(string)uncompress(rawdata);
347       } catch (Exception e) {
348         data = this.sock.receiveText();
349       }
350 
351       if (data == "") {
352         continue;
353       }
354 
355       try {
356         this.parse(data);
357       } catch (Exception e) {
358         this.log.warningf("failed to handle %s (%s)", e, data);
359       } catch (Error e) {
360         this.log.warningf("failed to handle %s (%s)", e, data);
361       }
362     }
363 
364     this.log.critical("Gateway websocket closed");
365     this.connected = false;
366     this.reconnects++;
367 
368     if (this.reconnects > MAX_RECONNECTS) {
369       this.log.errorf("Max Gateway WS reconnects (%s) hit, aborting...", this.reconnects);
370       return;
371     }
372 
373     if (this.reconnects > 1) {
374       this.sessionID = null;
375       this.seq = 0;
376       this.log.warning("Waiting 5 seconds before reconnecting...");
377       sleep(5.seconds);
378     }
379 
380     this.log.info("Attempting reconnection...");
381     return this.start();
382   }
383 }