1 /**
2   Manages Discord voice connections.
3 */
4 module dscord.voice.client;
5 
6 import core.time,
7        core.stdc.time,
8        std.stdio,
9        std.zlib,
10        std.array,
11        std.stdio,
12        std.bitmanip,
13        std.outbuffer,
14        std..string,
15        std.algorithm.comparison;
16 
17 import vibe.core.core,
18        vibe.core.net,
19        vibe.inet.url,
20        vibe.http.websockets;
21 
22 import dcad.types : DCAFile;
23 
24 import shaker : crypto_secretbox_easy;
25 
26 import dscord.types,
27        dscord.voice,
28        dscord.client,
29        dscord.gateway,
30        dscord.util.emitter,
31        dscord.util.ticker;
32 
33 /// VoiceClient connection states
34 enum VoiceStatus {
35   DISCONNECTED = 0,
36   CONNECTING = 1,
37   CONNECTED = 2,
38   READY = 3,
39 }
40 
41 /// RTPHeader used for sending RTP data
42 struct RTPHeader {
43   /// Sequence number of the current frame
44   ushort  seq;
45 
46   /// Timestamp of the current frame
47   uint    ts;
48 
49   /// Source ID of the current sender
50   uint    ssrc;
51 
52   this(ushort seq, uint ts, uint ssrc) {
53     this.seq = seq;
54     this.ts = ts;
55     this.ssrc = ssrc;
56   }
57 
58   /// Returns a packed (in bytes) version of this header
59   ubyte[] pack() {
60     OutBuffer b = new OutBuffer();
61     b.write('\x80');
62     b.write('\x78');
63     b.write(nativeToBigEndian(this.seq));
64     b.write(nativeToBigEndian(this.ts));
65     b.write(nativeToBigEndian(this.ssrc));
66     return b.toBytes;
67   }
68 }
69 
70 /// UDP Connection wrapper for the VoiceClient
71 class UDPVoiceClient {
72   /// Parent VoiceClient reference
73   VoiceClient    vc;
74 
75   // UDP Connection
76   UDPConnection  conn;
77 
78   private {
79     // Local connection info
80     string  ip;
81     ushort  port;
82 
83     // Running state
84     bool  running;
85   }
86 
87   this(VoiceClient vc) {
88     this.vc = vc;
89   }
90 
91   void run() {
92     this.running = true;
93 
94     while (this.running) {
95       auto data = this.conn.recv();
96     }
97   }
98 
99   void close() {
100     this.running = false;
101 
102     try {
103       this.conn.close();
104     } catch (Error e) {}
105   }
106 
107   bool connect(string hostname, ushort port, Duration timeout=5.seconds) {
108     this.conn = listenUDP(0);
109     this.conn.connect(hostname, port);
110 
111     // Send IP discovery payload
112     OutBuffer b = new OutBuffer();
113     b.write(nativeToBigEndian(this.vc.ssrc));
114     b.fill0(70 - b.toBytes.length);
115     this.conn.send(b.toBytes);
116 
117     // Wait for the IP discovery response, maybe timeout after a bit
118     string data;
119     try {
120       data = cast(string)this.conn.recv(timeout);
121     } catch (Exception e) {
122       return false;
123     }
124 
125     // Parse the IP discovery response
126     this.ip = data[4..(data[4..data.length].indexOf(0x00) + 4)];
127     ubyte[2] portBytes = cast(ubyte[])(data)[data.length - 2..data.length];
128     this.port = littleEndianToNative!(ushort, 2)(portBytes);
129 
130     // Finally actually start running the task
131     runTask(&this.run);
132     return true;
133   }
134 }
135 
136 class VoiceClient {
137   /// Global client which owns this VoiceClient
138   Client     client;
139 
140   /// The channel this VoiceClient is attached to
141   Channel    channel;
142 
143   /// Packet emitter
144   Emitter  packetEmitter;
145 
146   /// UDP Client connection
147   UDPVoiceClient  udp;
148 
149   // Current voice connection state
150   VoiceStatus state = VoiceStatus.DISCONNECTED;
151 
152   // Currently playing item + player task
153   Playable  playable;
154 
155   private {
156     // Logger reference
157     Logger  log;
158 
159     // Event triggered when connection is complete
160     ManualEvent  waitForConnected;
161 
162     // Player task
163     Task  playerTask;
164 
165     // Voice websocket
166     WebSocket  sock;
167 
168     // Heartbeater task
169     Task  heartbeater;
170 
171     // Secret key + encryption state
172     ubyte[32] secretKey;
173     ubyte[12] headerRaw;
174     ubyte[24] nonceRaw;
175 
176     // Various connection attributes
177     string  token;
178     URL     endpoint;
179     ushort  ssrc;
180     ushort  port;
181 
182     // Track mute/deaf states
183     bool    mute, deaf;
184 
185     // Track the current speaking state
186     bool    speaking = false;
187 
188     // Used to track VoiceServerUpdates
189     EventListener  updateListener;
190 
191     // Used to control pausing state
192     ManualEvent pauseEvent;
193   }
194 
195   this(Channel c, bool mute=false, bool deaf=false) {
196     this.channel = c;
197     this.client = c.client;
198     this.log = this.client.log;
199 
200     this.mute = mute;
201     this.deaf = deaf;
202 
203     this.packetEmitter = new Emitter;
204     this.packetEmitter.listen!VoiceReadyPacket(&this.handleVoiceReadyPacket);
205     this.packetEmitter.listen!VoiceSessionDescriptionPacket(
206       &this.handleVoiceSessionDescription);
207   }
208 
209   /// Set the speaking state
210   void setSpeaking(bool value) {
211     if (this.speaking == value) return;
212 
213     this.speaking = value;
214     this.send(new VoiceSpeakingPacket(value, 0));
215   }
216 
217   private void handleVoiceReadyPacket(VoiceReadyPacket p) {
218     this.ssrc = p.ssrc;
219     this.port = p.port;
220 
221     // Spawn the heartbeater
222     this.heartbeater = runTask(&this.heartbeat, p.heartbeatInterval);
223 
224     // If we don't have a UDP connection open (e.g. not reconnecting), open one
225     //  now.
226     if (!this.udp) {
227       this.udp = new UDPVoiceClient(this);
228     }
229 
230     // Then actually connect and perform IP discovery
231     if (!this.udp.connect(this.endpoint.host, this.port)) {
232       this.log.warning("VoiceClient failed to connect over UDP and perform IP discovery");
233       this.disconnect(false);
234       return;
235     }
236 
237     // TODO: ensure the mode is supported
238 
239     // Select the protocol
240     //  TODO: encryption/xsalsa
241     this.send(new VoiceSelectProtocolPacket("udp", "xsalsa20_poly1305", this.udp.ip, this.udp.port));
242   }
243 
244   private void handleVoiceSessionDescription(VoiceSessionDescriptionPacket p) {
245     this.log.tracef("Recieved VoiceSessionDescription, finished connection sequence.");
246 
247     this.secretKey = cast(ubyte[32])p.secretKey[0..32];
248     this.log.tracef("secret_key %s", this.secretKey);
249 
250     // Toggle our voice speaking state so everyone learns our SSRC
251     this.send(new VoiceSpeakingPacket(true, 0));
252     this.send(new VoiceSpeakingPacket(false, 0));
253     sleep(250.msecs);
254 
255     // Set the state to READY, we can now send voice data
256     this.state = VoiceStatus.READY;
257 
258     // Emit the connected event
259     this.waitForConnected.emit();
260 
261     // If we where paused (e.g. in the process of reconnecting), unpause now
262     if (this.paused) {
263       // For whatever reason, if we don't sleep here sometimes clients won't accept our audio
264       sleep(1.seconds);
265       this.resume();
266     }
267   }
268 
269   /// Whether the player is currently paused
270   @property bool paused() {
271     return (this.pauseEvent !is null);
272   }
273 
274   /// Pause the player
275   bool pause(bool wait=false) {
276     if (this.pauseEvent) {
277       if (!wait) return false;
278       this.pauseEvent.wait();
279     }
280 
281     this.pauseEvent = createManualEvent();
282     return true;
283   }
284 
285   /// Resume the player
286   bool resume() {
287     if (!this.paused) {
288       return false;
289     }
290 
291     // Avoid race conditions by copying
292     auto e = this.pauseEvent;
293     this.pauseEvent = null;
294     e.emit();
295     return true;
296   }
297 
298   private void runPlayer() {
299     this.playable.start();
300 
301     if (!this.playable.hasMoreFrames()) {
302       this.log.warning("Playable ran out of frames before playing");
303       return;
304     }
305 
306     this.setSpeaking(true);
307 
308     // Create a new timing ticker at the frame duration interval
309     StaticTicker ticker = new StaticTicker(this.playable.getFrameDuration().msecs, true);
310 
311     RTPHeader header;
312     header.ssrc = this.ssrc;
313 
314     ubyte[] frame;
315 
316     while (this.playable.hasMoreFrames()) {
317       // If the UDP connection isnt running, this is pointless
318       if (!this.udp || !this.udp.running) {
319         this.log.warning("UDPVoiceClient lost connection while playing audio");
320         this.setSpeaking(false);
321         return;
322       }
323 
324       // If we're paused, wait until we unpause to continue playing. Make sure
325       //  to set speaking here in case users connect during this period.
326       if (this.paused) {
327         // Only set our speaking status if we're still connected
328         if (this.sock.connected) this.setSpeaking(false);
329         this.pauseEvent.wait();
330         this.setSpeaking(true);
331 
332         // Reset the ticker so we don't fast forward it to catch up
333         ticker.reset();
334       }
335 
336       // Get the next frame from the playable, and send it
337       frame = this.playable.nextFrame();
338       header.seq++;
339 
340       // Encrypt the packet
341       this.headerRaw = header.pack();
342       this.nonceRaw[0..12] = headerRaw;
343 
344       ubyte[] payload;
345       payload.length = 16 + frame.length;
346 
347       assert(crypto_secretbox_easy(
348         payload.ptr,
349         frame.ptr, frame.length,
350         this.nonceRaw,
351         this.secretKey,
352       ) == 0);
353 
354       // And send the header + encrypted payload
355       this.udp.conn.send(this.headerRaw ~ payload);
356       header.ts += this.playable.getFrameSize();
357 
358       // Wait until its time to play the next frame
359       ticker.sleep();
360     }
361 
362     this.setSpeaking(false);
363   }
364 
365   /// Whether the player is currently active
366   @property bool playing() {
367     return (this.playerTask && this.playerTask.running);
368   }
369 
370   /// Plays a Playable
371   VoiceClient play(Playable p) {
372     assert(this.state == VoiceStatus.READY, "Must be connected to play audio");
373 
374     // If we are currently playing something, kill it
375     if (this.playerTask && this.playerTask.running) {
376       this.playerTask.terminate();
377     }
378 
379     this.playable = p;
380     this.playerTask = runTask(&this.runPlayer);
381     return this;
382   }
383 
384   private void heartbeat(ushort heartbeatInterval) {
385     while (this.state >= VoiceStatus.CONNECTED) {
386       uint unixTime = cast(uint)core.stdc.time.time(null);
387       this.send(new VoiceHeartbeatPacket(unixTime * 1000));
388       sleep(heartbeatInterval.msecs);
389     }
390   }
391 
392   private void dispatchVoicePacket(T)(VibeJSON obj) {
393     T packet = deserializeFromJSON!(T)(obj);
394     this.packetEmitter.emit!T(packet);
395   }
396 
397   private void parse(string rawData) {
398     VibeJSON json = parseJsonString(rawData);
399 
400     VoiceOPCode op = json["op"].get!VoiceOPCode;
401 
402     version (DEBUG_GATEWAY_DATA) {
403       this.log.tracef("VOICE RECV: %s", rawData);
404     }
405 
406     switch (op) {
407       case VoiceOPCode.VOICE_READY:
408         this.dispatchVoicePacket!VoiceReadyPacket(json["d"]);
409         break;
410       case VoiceOPCode.VOICE_SESSION_DESCRIPTION:
411         this.dispatchVoicePacket!VoiceSessionDescriptionPacket(json["d"]);
412         break;
413       case VoiceOPCode.VOICE_HEARTBEAT:
414       case VoiceOPCode.VOICE_SPEAKING:
415         // Ignored
416         break;
417       default:
418         this.log.warningf("Unhandled voice packet: %s", op);
419         break;
420     }
421   }
422 
423   /// Sends a payload to the websocket
424   void send(Serializable p) {
425     string data = p.serialize().toString;
426 
427     version (DEBUG_GATEWAY_DATA) {
428       this.log.tracef("VOICE SEND: %s", data);
429     }
430 
431     try {
432       this.sock.send(data);
433     } catch (Exception e) {
434       this.log.warningf("ERROR: %s", e.toString);
435     }
436   }
437 
438   // Runs this voice client
439   void run() {
440     string data;
441 
442     while (this.sock.waitForData()) {
443       // Not possible to recv compressed data on the voice ws right now, but lets future guard
444       try {
445         ubyte[] rawdata = this.sock.receiveBinary();
446         data = cast(string)uncompress(rawdata);
447       } catch (Exception e) {
448         data = this.sock.receiveText();
449       }
450 
451       if (data == "") {
452         continue;
453       }
454 
455       try {
456         this.parse(data);
457       } catch (Exception e) {
458         this.log.warningf("failed to handle %s (%s)", e, data);
459       } catch (Error e) {
460         this.log.warningf("failed to handle %s (%s)", e, data);
461       }
462     }
463 
464     this.log.warningf("Lost voice websocket connection in state %s", this.state);
465 
466     // If we where in state READY, reconnect fully
467     if (this.state == VoiceStatus.READY) {
468       this.log.warning("Attempting reconnection of voice connection");
469       this.disconnect(false);
470       this.connect();
471     }
472   }
473 
474   private void onVoiceServerUpdate(VoiceServerUpdate event) {
475     if (this.channel.guild.id != event.guildID || !event.token) {
476       return;
477     }
478 
479     if (this.token && event.token != this.token) {
480       return;
481     } else {
482       this.token = event.token;
483     }
484 
485     // If we're connected (e.g. have a WS open), close it so we can reconnect
486     //  to the new voice endpoint.
487     if (this.state >= VoiceStatus.CONNECTED) {
488       this.log.warningf("Voice server updated while connected to voice, attempting server change");
489 
490       // If we're playing, pause until we finish reconnecting
491       if (!this.paused && this.playing) {
492         this.log.tracef("pausing player while we reconnect");
493         this.pause();
494       }
495 
496       // Set state before we close so we don't attempt to reconnect
497       this.state = VoiceStatus.CONNECTED;
498       if (this.sock.connected) this.sock.close();
499     }
500 
501     // Make sure our state is now CONNECTED
502     this.state = VoiceStatus.CONNECTED;
503 
504     // Grab endpoint and create a proper URL out of it
505     this.endpoint = URL("ws", event.endpoint.split(":")[0], 0, Path());
506     this.sock = connectWebSocket(this.endpoint);
507     runTask(&this.run);
508 
509     // Send identify
510     this.send(new VoiceIdentifyPacket(
511       this.channel.guild.id,
512       this.client.state.me.id,
513       this.client.gw.sessionID,
514       this.token
515     ));
516   }
517 
518   /// Attempt a connection to the voice channel this VoiceClient is attached to.
519   bool connect(Duration timeout=5.seconds) {
520     this.state = VoiceStatus.CONNECTING;
521     this.waitForConnected = createManualEvent();
522 
523     // Start listening for VoiceServerUpdates
524     this.updateListener = this.client.gw.eventEmitter.listen!VoiceServerUpdate(
525       &this.onVoiceServerUpdate
526     );
527 
528     // Send our VoiceStateUpdate
529     this.client.gw.send(new VoiceStateUpdatePacket(
530       this.channel.guild.id,
531       this.channel.id,
532       this.mute,
533       this.deaf
534    ));
535 
536     // Wait for connection event to be emitted (or timeout and disconnect)
537     if (this.waitForConnected.wait(timeout, 0)) {
538       return true;
539     } else {
540       this.disconnect(false);
541       return false;
542     }
543   }
544 
545   /// Disconnects from the voice channel. If clean is true, waits to finish playing.
546   void disconnect(bool clean=true) {
547     if (this.playing) {
548       if (clean) {
549         this.log.tracef("Requested CLEAN voice disconnect, waiting...");
550         this.playerTask.join();
551         this.log.tracef("Executing previously requested CLEAN voice disconnect");
552       }
553     }
554 
555     // Send gateway update if we requested it
556     this.client.gw.send(new VoiceStateUpdatePacket(
557       this.channel.guild.id,
558       0,
559       this.mute,
560       this.deaf
561     ));
562 
563     // Always make sure our updateListener is unbound
564     this.updateListener.unbind();
565 
566     // If we're actually connected, close the voice socket
567     if (this.state >= VoiceStatus.CONNECTING) {
568       this.state = VoiceStatus.DISCONNECTED;
569       if (this.sock && this.sock.connected) this.sock.close();
570     }
571 
572     // If we have a UDP connection, close it
573     if (this.udp) {
574       this.udp.close();
575       this.udp.destroy();
576       this.udp = null;
577     }
578 
579     // Finally set state to disconnected
580     this.state = VoiceStatus.DISCONNECTED;
581   }
582 }