1 /** 2 Manages Discord voice connections. 3 */ 4 module dscord.voice.client; 5 6 import core.time, 7 core.stdc.time, 8 std.stdio, 9 std.zlib, 10 std.array, 11 std.stdio, 12 std.bitmanip, 13 std.outbuffer, 14 std..string, 15 std.algorithm.comparison; 16 17 import vibe.core.core, 18 vibe.core.net, 19 vibe.inet.url, 20 vibe.http.websockets; 21 22 import dcad.types : DCAFile; 23 24 import shaker : crypto_secretbox_easy; 25 26 import dscord.types, 27 dscord.voice, 28 dscord.client, 29 dscord.gateway, 30 dscord.util.emitter, 31 dscord.util.ticker; 32 33 /// VoiceClient connection states 34 enum VoiceStatus { 35 DISCONNECTED = 0, 36 CONNECTING = 1, 37 CONNECTED = 2, 38 READY = 3, 39 } 40 41 /// RTPHeader used for sending RTP data 42 struct RTPHeader { 43 /// Sequence number of the current frame 44 ushort seq; 45 46 /// Timestamp of the current frame 47 uint ts; 48 49 /// Source ID of the current sender 50 uint ssrc; 51 52 this(ushort seq, uint ts, uint ssrc) { 53 this.seq = seq; 54 this.ts = ts; 55 this.ssrc = ssrc; 56 } 57 58 /// Returns a packed (in bytes) version of this header 59 ubyte[] pack() { 60 OutBuffer b = new OutBuffer(); 61 b.write('\x80'); 62 b.write('\x78'); 63 b.write(nativeToBigEndian(this.seq)); 64 b.write(nativeToBigEndian(this.ts)); 65 b.write(nativeToBigEndian(this.ssrc)); 66 return b.toBytes; 67 } 68 } 69 70 /// UDP Connection wrapper for the VoiceClient 71 class UDPVoiceClient { 72 /// Parent VoiceClient reference 73 VoiceClient vc; 74 75 // UDP Connection 76 UDPConnection conn; 77 78 private { 79 // Local connection info 80 string ip; 81 ushort port; 82 83 // Running state 84 bool running; 85 } 86 87 this(VoiceClient vc) { 88 this.vc = vc; 89 } 90 91 void run() { 92 this.running = true; 93 94 while (this.running) { 95 auto data = this.conn.recv(); 96 } 97 } 98 99 void close() { 100 this.running = false; 101 102 try { 103 this.conn.close(); 104 } catch (Error e) {} 105 } 106 107 bool connect(string hostname, ushort port, Duration timeout=5.seconds) { 108 this.conn = listenUDP(0); 109 this.conn.connect(hostname, port); 110 111 // Send IP discovery payload 112 OutBuffer b = new OutBuffer(); 113 b.write(nativeToBigEndian(this.vc.ssrc)); 114 b.fill0(70 - b.toBytes.length); 115 this.conn.send(b.toBytes); 116 117 // Wait for the IP discovery response, maybe timeout after a bit 118 string data; 119 try { 120 data = cast(string)this.conn.recv(timeout); 121 } catch (Exception e) { 122 return false; 123 } 124 125 // Parse the IP discovery response 126 this.ip = data[4..(data[4..data.length].indexOf(0x00) + 4)]; 127 ubyte[2] portBytes = cast(ubyte[])(data)[data.length - 2..data.length]; 128 this.port = littleEndianToNative!(ushort, 2)(portBytes); 129 130 // Finally actually start running the task 131 runTask(&this.run); 132 return true; 133 } 134 } 135 136 class VoiceClient { 137 /// Global client which owns this VoiceClient 138 Client client; 139 140 /// The channel this VoiceClient is attached to 141 Channel channel; 142 143 /// Packet emitter 144 Emitter packetEmitter; 145 146 /// UDP Client connection 147 UDPVoiceClient udp; 148 149 // Current voice connection state 150 VoiceStatus state = VoiceStatus.DISCONNECTED; 151 152 // Currently playing item + player task 153 Playable playable; 154 155 private { 156 // Logger reference 157 Logger log; 158 159 // Event triggered when connection is complete 160 ManualEvent waitForConnected; 161 162 // Player task 163 Task playerTask; 164 165 // Voice websocket 166 WebSocket sock; 167 168 // Heartbeater task 169 Task heartbeater; 170 171 // Secret key + encryption state 172 ubyte[32] secretKey; 173 ubyte[12] headerRaw; 174 ubyte[24] nonceRaw; 175 176 // Various connection attributes 177 string token; 178 URL endpoint; 179 ushort ssrc; 180 ushort port; 181 182 // Track mute/deaf states 183 bool mute, deaf; 184 185 // Track the current speaking state 186 bool speaking = false; 187 188 // Used to track VoiceServerUpdates 189 EventListener updateListener; 190 191 // Used to control pausing state 192 ManualEvent pauseEvent; 193 } 194 195 this(Channel c, bool mute=false, bool deaf=false) { 196 this.channel = c; 197 this.client = c.client; 198 this.log = this.client.log; 199 200 this.mute = mute; 201 this.deaf = deaf; 202 203 this.packetEmitter = new Emitter; 204 this.packetEmitter.listen!VoiceReadyPacket(&this.handleVoiceReadyPacket); 205 this.packetEmitter.listen!VoiceSessionDescriptionPacket( 206 &this.handleVoiceSessionDescription); 207 } 208 209 /// Set the speaking state 210 void setSpeaking(bool value) { 211 if (this.speaking == value) return; 212 213 this.speaking = value; 214 this.send(new VoiceSpeakingPacket(value, 0)); 215 } 216 217 private void handleVoiceReadyPacket(VoiceReadyPacket p) { 218 this.ssrc = p.ssrc; 219 this.port = p.port; 220 221 // Spawn the heartbeater 222 this.heartbeater = runTask(&this.heartbeat, p.heartbeatInterval); 223 224 // If we don't have a UDP connection open (e.g. not reconnecting), open one 225 // now. 226 if (!this.udp) { 227 this.udp = new UDPVoiceClient(this); 228 } 229 230 // Then actually connect and perform IP discovery 231 if (!this.udp.connect(this.endpoint.host, this.port)) { 232 this.log.warning("VoiceClient failed to connect over UDP and perform IP discovery"); 233 this.disconnect(false); 234 return; 235 } 236 237 // TODO: ensure the mode is supported 238 239 // Select the protocol 240 // TODO: encryption/xsalsa 241 this.send(new VoiceSelectProtocolPacket("udp", "xsalsa20_poly1305", this.udp.ip, this.udp.port)); 242 } 243 244 private void handleVoiceSessionDescription(VoiceSessionDescriptionPacket p) { 245 this.log.tracef("Recieved VoiceSessionDescription, finished connection sequence."); 246 247 this.secretKey = cast(ubyte[32])p.secretKey[0..32]; 248 this.log.tracef("secret_key %s", this.secretKey); 249 250 // Toggle our voice speaking state so everyone learns our SSRC 251 this.send(new VoiceSpeakingPacket(true, 0)); 252 this.send(new VoiceSpeakingPacket(false, 0)); 253 sleep(250.msecs); 254 255 // Set the state to READY, we can now send voice data 256 this.state = VoiceStatus.READY; 257 258 // Emit the connected event 259 this.waitForConnected.emit(); 260 261 // If we where paused (e.g. in the process of reconnecting), unpause now 262 if (this.paused) { 263 // For whatever reason, if we don't sleep here sometimes clients won't accept our audio 264 sleep(1.seconds); 265 this.resume(); 266 } 267 } 268 269 /// Whether the player is currently paused 270 @property bool paused() { 271 return (this.pauseEvent !is null); 272 } 273 274 /// Pause the player 275 bool pause(bool wait=false) { 276 if (this.pauseEvent) { 277 if (!wait) return false; 278 this.pauseEvent.wait(); 279 } 280 281 this.pauseEvent = createManualEvent(); 282 return true; 283 } 284 285 /// Resume the player 286 bool resume() { 287 if (!this.paused) { 288 return false; 289 } 290 291 // Avoid race conditions by copying 292 auto e = this.pauseEvent; 293 this.pauseEvent = null; 294 e.emit(); 295 return true; 296 } 297 298 private void runPlayer() { 299 this.playable.start(); 300 301 if (!this.playable.hasMoreFrames()) { 302 this.log.warning("Playable ran out of frames before playing"); 303 return; 304 } 305 306 this.setSpeaking(true); 307 308 // Create a new timing ticker at the frame duration interval 309 StaticTicker ticker = new StaticTicker(this.playable.getFrameDuration().msecs, true); 310 311 RTPHeader header; 312 header.ssrc = this.ssrc; 313 314 ubyte[] frame; 315 316 while (this.playable.hasMoreFrames()) { 317 // If the UDP connection isnt running, this is pointless 318 if (!this.udp || !this.udp.running) { 319 this.log.warning("UDPVoiceClient lost connection while playing audio"); 320 this.setSpeaking(false); 321 return; 322 } 323 324 // If we're paused, wait until we unpause to continue playing. Make sure 325 // to set speaking here in case users connect during this period. 326 if (this.paused) { 327 // Only set our speaking status if we're still connected 328 if (this.sock.connected) this.setSpeaking(false); 329 this.pauseEvent.wait(); 330 this.setSpeaking(true); 331 332 // Reset the ticker so we don't fast forward it to catch up 333 ticker.reset(); 334 } 335 336 // Get the next frame from the playable, and send it 337 frame = this.playable.nextFrame(); 338 header.seq++; 339 340 // Encrypt the packet 341 this.headerRaw = header.pack(); 342 this.nonceRaw[0..12] = headerRaw; 343 344 ubyte[] payload; 345 payload.length = 16 + frame.length; 346 347 assert(crypto_secretbox_easy( 348 payload.ptr, 349 frame.ptr, frame.length, 350 this.nonceRaw, 351 this.secretKey, 352 ) == 0); 353 354 // And send the header + encrypted payload 355 this.udp.conn.send(this.headerRaw ~ payload); 356 header.ts += this.playable.getFrameSize(); 357 358 // Wait until its time to play the next frame 359 ticker.sleep(); 360 } 361 362 this.setSpeaking(false); 363 } 364 365 /// Whether the player is currently active 366 @property bool playing() { 367 return (this.playerTask && this.playerTask.running); 368 } 369 370 /// Plays a Playable 371 VoiceClient play(Playable p) { 372 assert(this.state == VoiceStatus.READY, "Must be connected to play audio"); 373 374 // If we are currently playing something, kill it 375 if (this.playerTask && this.playerTask.running) { 376 this.playerTask.terminate(); 377 } 378 379 this.playable = p; 380 this.playerTask = runTask(&this.runPlayer); 381 return this; 382 } 383 384 private void heartbeat(ushort heartbeatInterval) { 385 while (this.state >= VoiceStatus.CONNECTED) { 386 uint unixTime = cast(uint)core.stdc.time.time(null); 387 this.send(new VoiceHeartbeatPacket(unixTime * 1000)); 388 sleep(heartbeatInterval.msecs); 389 } 390 } 391 392 private void dispatchVoicePacket(T)(VibeJSON obj) { 393 T packet = deserializeFromJSON!(T)(obj); 394 this.packetEmitter.emit!T(packet); 395 } 396 397 private void parse(string rawData) { 398 VibeJSON json = parseJsonString(rawData); 399 400 VoiceOPCode op = json["op"].get!VoiceOPCode; 401 402 version (DEBUG_GATEWAY_DATA) { 403 this.log.tracef("VOICE RECV: %s", rawData); 404 } 405 406 switch (op) { 407 case VoiceOPCode.VOICE_READY: 408 this.dispatchVoicePacket!VoiceReadyPacket(json["d"]); 409 break; 410 case VoiceOPCode.VOICE_SESSION_DESCRIPTION: 411 this.dispatchVoicePacket!VoiceSessionDescriptionPacket(json["d"]); 412 break; 413 case VoiceOPCode.VOICE_HEARTBEAT: 414 case VoiceOPCode.VOICE_SPEAKING: 415 // Ignored 416 break; 417 default: 418 this.log.warningf("Unhandled voice packet: %s", op); 419 break; 420 } 421 } 422 423 /// Sends a payload to the websocket 424 void send(Serializable p) { 425 string data = p.serialize().toString; 426 427 version (DEBUG_GATEWAY_DATA) { 428 this.log.tracef("VOICE SEND: %s", data); 429 } 430 431 try { 432 this.sock.send(data); 433 } catch (Exception e) { 434 this.log.warningf("ERROR: %s", e.toString); 435 } 436 } 437 438 // Runs this voice client 439 void run() { 440 string data; 441 442 while (this.sock.waitForData()) { 443 // Not possible to recv compressed data on the voice ws right now, but lets future guard 444 try { 445 ubyte[] rawdata = this.sock.receiveBinary(); 446 data = cast(string)uncompress(rawdata); 447 } catch (Exception e) { 448 data = this.sock.receiveText(); 449 } 450 451 if (data == "") { 452 continue; 453 } 454 455 try { 456 this.parse(data); 457 } catch (Exception e) { 458 this.log.warningf("failed to handle %s (%s)", e, data); 459 } catch (Error e) { 460 this.log.warningf("failed to handle %s (%s)", e, data); 461 } 462 } 463 464 this.log.warningf("Lost voice websocket connection in state %s", this.state); 465 466 // If we where in state READY, reconnect fully 467 if (this.state == VoiceStatus.READY) { 468 this.log.warning("Attempting reconnection of voice connection"); 469 this.disconnect(false); 470 this.connect(); 471 } 472 } 473 474 private void onVoiceServerUpdate(VoiceServerUpdate event) { 475 if (this.channel.guild.id != event.guildID || !event.token) { 476 return; 477 } 478 479 if (this.token && event.token != this.token) { 480 return; 481 } else { 482 this.token = event.token; 483 } 484 485 // If we're connected (e.g. have a WS open), close it so we can reconnect 486 // to the new voice endpoint. 487 if (this.state >= VoiceStatus.CONNECTED) { 488 this.log.warningf("Voice server updated while connected to voice, attempting server change"); 489 490 // If we're playing, pause until we finish reconnecting 491 if (!this.paused && this.playing) { 492 this.log.tracef("pausing player while we reconnect"); 493 this.pause(); 494 } 495 496 // Set state before we close so we don't attempt to reconnect 497 this.state = VoiceStatus.CONNECTED; 498 if (this.sock.connected) this.sock.close(); 499 } 500 501 // Make sure our state is now CONNECTED 502 this.state = VoiceStatus.CONNECTED; 503 504 // Grab endpoint and create a proper URL out of it 505 this.endpoint = URL("ws", event.endpoint.split(":")[0], 0, Path()); 506 this.sock = connectWebSocket(this.endpoint); 507 runTask(&this.run); 508 509 // Send identify 510 this.send(new VoiceIdentifyPacket( 511 this.channel.guild.id, 512 this.client.state.me.id, 513 this.client.gw.sessionID, 514 this.token 515 )); 516 } 517 518 /// Attempt a connection to the voice channel this VoiceClient is attached to. 519 bool connect(Duration timeout=5.seconds) { 520 this.state = VoiceStatus.CONNECTING; 521 this.waitForConnected = createManualEvent(); 522 523 // Start listening for VoiceServerUpdates 524 this.updateListener = this.client.gw.eventEmitter.listen!VoiceServerUpdate( 525 &this.onVoiceServerUpdate 526 ); 527 528 // Send our VoiceStateUpdate 529 this.client.gw.send(new VoiceStateUpdatePacket( 530 this.channel.guild.id, 531 this.channel.id, 532 this.mute, 533 this.deaf 534 )); 535 536 // Wait for connection event to be emitted (or timeout and disconnect) 537 if (this.waitForConnected.wait(timeout, 0)) { 538 return true; 539 } else { 540 this.disconnect(false); 541 return false; 542 } 543 } 544 545 /// Disconnects from the voice channel. If clean is true, waits to finish playing. 546 void disconnect(bool clean=true) { 547 if (this.playing) { 548 if (clean) { 549 this.log.tracef("Requested CLEAN voice disconnect, waiting..."); 550 this.playerTask.join(); 551 this.log.tracef("Executing previously requested CLEAN voice disconnect"); 552 } 553 } 554 555 // Send gateway update if we requested it 556 this.client.gw.send(new VoiceStateUpdatePacket( 557 this.channel.guild.id, 558 0, 559 this.mute, 560 this.deaf 561 )); 562 563 // Always make sure our updateListener is unbound 564 this.updateListener.unbind(); 565 566 // If we're actually connected, close the voice socket 567 if (this.state >= VoiceStatus.CONNECTING) { 568 this.state = VoiceStatus.DISCONNECTED; 569 if (this.sock && this.sock.connected) this.sock.close(); 570 } 571 572 // If we have a UDP connection, close it 573 if (this.udp) { 574 this.udp.close(); 575 this.udp.destroy(); 576 this.udp = null; 577 } 578 579 // Finally set state to disconnected 580 this.state = VoiceStatus.DISCONNECTED; 581 } 582 }