1 module dscord.gateway.client;
2 
3 import std.stdio,
4        std.uni,
5        std.functional,
6        std.zlib,
7        std.datetime,
8        std.variant;
9 
10 import vibe.core.core,
11        vibe.inet.url,
12        vibe.http.websockets;
13 
14 import dscord.client,
15        dscord.gateway.packets,
16        dscord.gateway.events,
17        dscord.util.emitter,
18        dscord.util.json;
19 
20 /** Maximum reconnects the GatewayClient will try before resetting session state */
21 const ubyte MAX_RECONNECTS = 6;
22 
23 /**
24   GatewayClient is the base abstraction for connecting to, and interacting with
25   the Discord Websocket (gateway) API.
26 */
27 class GatewayClient {
28   /** Client instance for this gateway connection */
29   Client     client;
30 
31   /** WebSocket connection for this gateway connection */
32   WebSocket  sock;
33 
34   /** Gateway SessionID, used for resuming. */
35   string  sessionID;
36 
37   /** Gateway sequence number, used for resuming */
38   uint    seq;
39 
40   /** Heartbeat interval */
41   uint    hb_interval;
42 
43   /** Whether this GatewayClient is currently connected */
44   bool    connected;
45 
46   /** Number of reconnects attempted */
47   ubyte   reconnects;
48 
49   /** The heartbeater task */
50   Task    heartbeater;
51 
52   /** Event emitter for Gateway Packets */
53   Emitter  eventEmitter;
54 
55   private {
56     /** Cached gateway URL from the API */
57     string  cachedGatewayURL;
58   }
59 
60   this(Client client) {
61     this.client = client;
62 
63     // Create the event emitter and listen to some required gateway events.
64     this.eventEmitter = new Emitter;
65     this.eventEmitter.listen!Ready(toDelegate(&this.handleReadyEvent));
66     this.eventEmitter.listen!Resumed(toDelegate(&this.handleResumedEvent));
67 
68     // Copy emitters to client for easier API access
69     client.events = this.eventEmitter;
70   }
71 
72   /**
73     Logger for this GatewayClient.
74   */
75   @property Logger log() {
76     return this.client.log;
77   }
78 
79   /**
80     Starts a connection to the gateway. Also called for resuming/reconnecting.
81   */
82   void start() {
83     if (this.sock && this.sock.connected) this.sock.close();
84 
85     // If this is our first connection, get a gateway WS URL
86     if (!this.cachedGatewayURL) {
87       this.cachedGatewayURL = client.api.gateway();
88     }
89 
90     // Start the main task
91     this.log.infof("Starting connection to Gateway WebSocket (%s)", this.cachedGatewayURL);
92     this.sock = connectWebSocket(URL(this.cachedGatewayURL));
93     runTask(toDelegate(&this.run));
94   }
95 
96   /**
97     Send a gateway payload.
98   */
99   void send(Serializable p) {
100     JSONValue data = p.serialize();
101     this.log.tracef("gateway-send: %s", data.toString);
102     this.sock.send(data.toString);
103   }
104 
105   private void handleReadyEvent(Ready  r) {
106     this.log.infof("Recieved READY payload, starting heartbeater");
107     this.hb_interval = r.heartbeatInterval;
108     this.sessionID = r.sessionID;
109     this.heartbeater = runTask(toDelegate(&this.heartbeat));
110     this.reconnects = 0;
111   }
112 
113   private void handleResumedEvent(Resumed r) {
114     this.heartbeater = runTask(toDelegate(&this.heartbeat));
115   }
116 
117   private void emitDispatchEvent(T)(ref JSON obj) {
118     T v = new T(this.client, obj);
119     this.eventEmitter.emit!T(v);
120     v.resolveDeferreds();
121     v.destroy();
122   }
123 
124   private void handleDispatchPacket(uint seq, string type, ref JSON obj) {
125     // Update sequence number if it's larger than what we have
126     if (seq > this.seq) {
127       this.seq = seq;
128     }
129 
130     switch (type) {
131       case "READY":
132         this.emitDispatchEvent!Ready(obj);
133         break;
134       case "RESUMED":
135         this.emitDispatchEvent!Resumed(obj);
136         break;
137       case "CHANNEL_CREATE":
138         this.emitDispatchEvent!ChannelCreate(obj);
139         break;
140       case "CHANNEL_UPDATE":
141         this.emitDispatchEvent!ChannelUpdate(obj);
142         break;
143       case "CHANNEL_DELETE":
144         this.emitDispatchEvent!ChannelDelete(obj);
145         break;
146       case "GUILD_BAN_ADD":
147         this.emitDispatchEvent!GuildBanAdd(obj);
148         break;
149       case "GUILD_BAN_REMOVE":
150         this.emitDispatchEvent!GuildBanRemove(obj);
151         break;
152       case "GUILD_CREATE":
153         this.emitDispatchEvent!GuildCreate(obj);
154         break;
155       case "GUILD_UPDATE":
156         this.emitDispatchEvent!GuildUpdate(obj);
157         break;
158       case "GUILD_DELETE":
159         this.emitDispatchEvent!GuildDelete(obj);
160         break;
161       case "GUILD_EMOJIS_UPDATE":
162         this.emitDispatchEvent!GuildEmojisUpdate(obj);
163         break;
164       case "GUILD_INTEGRATIONS_UPDATE":
165         this.emitDispatchEvent!GuildIntegrationsUpdate(obj);
166         break;
167       case "GUILD_MEMBER_ADD":
168         this.emitDispatchEvent!GuildMemberAdd(obj);
169         break;
170       case "GUILD_MEMBER_UPDATE":
171         this.emitDispatchEvent!GuildMemberUpdate(obj);
172         break;
173       case "GUILD_MEMBER_REMOVE":
174         this.emitDispatchEvent!GuildMemberRemove(obj);
175         break;
176       case "GUILD_ROLE_CREATE":
177         this.emitDispatchEvent!GuildRoleCreate(obj);
178         break;
179       case "GUILD_ROLE_UPDATE":
180         this.emitDispatchEvent!GuildRoleUpdate(obj);
181         break;
182       case "GUILD_ROLE_DELETE":
183         this.emitDispatchEvent!GuildRoleDelete(obj);
184         break;
185       case "MESSAGE_CREATE":
186         this.emitDispatchEvent!MessageCreate(obj);
187         break;
188       case "MESSAGE_UPDATE":
189         this.emitDispatchEvent!MessageUpdate(obj);
190         break;
191       case "MESSAGE_DELETE":
192         this.emitDispatchEvent!MessageDelete(obj);
193         break;
194       case "PRESENCE_UPDATE":
195         this.emitDispatchEvent!PresenceUpdate(obj);
196         break;
197       case "TYPING_START":
198         this.emitDispatchEvent!TypingStart(obj);
199         break;
200       case "USER_SETTINGS_UPDATE":
201         this.emitDispatchEvent!UserSettingsUpdate(obj);
202         break;
203       case "USER_UPDATE":
204         this.emitDispatchEvent!UserUpdate(obj);
205         break;
206       case "VOICE_STATE_UPDATE":
207         this.emitDispatchEvent!VoiceStateUpdate(obj);
208         break;
209       case "VOICE_SERVER_UPDATE":
210         this.emitDispatchEvent!VoiceServerUpdate(obj);
211         break;
212       default:
213         this.log.warningf("Unhandled dispatch event: %s", type);
214         break;
215     }
216   }
217 
218   private void parse(string rawData) {
219     auto json = parseTrustedJSON(rawData);
220 
221     uint seq;
222     string type;
223     OPCode op;
224 
225     // Scan over each key, store any extra information until we hit the data payload
226     foreach (key; json.byKey) {
227       switch (key) {
228         case "op":
229           op = cast(OPCode)json.read!ushort;
230           break;
231         case "t":
232           type = json.read!string;
233           break;
234         case "s":
235           seq = json.read!uint;
236           break;
237         case "d":
238           switch (op) {
239             case OPCode.DISPATCH:
240               this.handleDispatchPacket(seq, type, json);
241               break;
242             default:
243               this.log.warningf("Unhandled gateway packet: %s", op);
244               break;
245           }
246           break;
247         default:
248           this.log.tracef("K: %s", key);
249           break;
250       }
251     }
252   }
253 
254   private void heartbeat() {
255     while (this.connected) {
256       this.send(new HeartbeatPacket(this.seq));
257       sleep(this.hb_interval.msecs);
258     }
259   }
260 
261   /**
262     Runs the GatewayClient until completion.
263   */
264   void run() {
265     string data;
266 
267     // If we already have a sequence number, attempt to resume
268     if (this.sessionID && this.seq) {
269       this.log.infof("Sending Resume Payload (we where %s at %s)", this.sessionID, this.seq);
270       this.send(new ResumePacket(this.client.token, this.sessionID, this.seq));
271     } else {
272       // On startup, send the identify payload
273       this.log.info("Sending Identify Payload");
274       this.send(new IdentifyPacket(this.client.token));
275     }
276 
277     this.log.info("Connected to Gateway");
278     this.connected = true;
279 
280     while (this.sock.waitForData()) {
281       if (!this.connected) break;
282 
283       try {
284         ubyte[] rawdata = this.sock.receiveBinary();
285         data = cast(string)uncompress(rawdata);
286       } catch (Exception e) {
287         data = this.sock.receiveText();
288       }
289 
290       if (data == "") {
291         continue;
292       }
293 
294       try {
295         this.parse(data);
296       } catch (Exception e) {
297         this.log.warningf("failed to handle %s (%s)", e, data);
298       } catch (Error e) {
299         this.log.warningf("failed to handle %s (%s)", e, data);
300       }
301     }
302 
303     this.log.critical("Gateway websocket closed");
304     this.connected = false;
305     this.reconnects++;
306 
307     if (this.reconnects > MAX_RECONNECTS) {
308       this.log.errorf("Max Gateway WS reconnects (%s) hit, aborting...", this.reconnects);
309       return;
310     }
311 
312     if (this.reconnects > 1) {
313       this.sessionID = null;
314       this.seq = 0;
315       this.log.warning("Waiting 5 seconds before reconnecting...");
316       sleep(5.seconds);
317     }
318 
319     this.log.info("Attempting reconnection...");
320     return this.start();
321   }
322 }