1 module dscord.voice.client; 2 3 import core.time, 4 core.stdc.time, 5 std.stdio, 6 std.zlib, 7 std.functional, 8 std.array, 9 std.stdio, 10 std.bitmanip, 11 std.outbuffer, 12 std.string; 13 14 import vibe.core.core, 15 vibe.core.net, 16 vibe.inet.url, 17 vibe.http.websockets; 18 19 // import dcad.types : DCAFile; 20 21 import dscord.client, 22 dscord.gateway.packets, 23 dscord.gateway.events, 24 dscord.voice.packets, 25 dscord.types.all, 26 dscord.util.emitter; 27 28 struct RTPHeader { 29 ushort seq; 30 uint ts; 31 uint ssrc; 32 33 this(ushort seq, uint ts, uint ssrc) { 34 this.seq = seq; 35 this.ts = ts; 36 this.ssrc = ssrc; 37 } 38 39 ubyte[] pack() { 40 OutBuffer b = new OutBuffer(); 41 b.write('\x80'); 42 b.write('\x78'); 43 b.write(nativeToBigEndian(this.seq)); 44 b.write(nativeToBigEndian(this.ts)); 45 b.write(nativeToBigEndian(this.ssrc)); 46 return b.toBytes; 47 } 48 } 49 50 class UDPVoiceClient { 51 VoiceClient vc; 52 UDPConnection conn; 53 54 // Local connection info 55 string ip; 56 ushort port; 57 58 // Voice audio info 59 ushort seq; 60 uint ts; 61 62 this(VoiceClient vc) { 63 this.vc = vc; 64 } 65 66 void run() { 67 while (true) { 68 auto data = this.conn.recv(); 69 // this.vc.log.infof("got data %s", cast(string)data); 70 } 71 } 72 73 bool connect(string hostname, ushort port, Duration timeout=5.seconds) { 74 this.conn = listenUDP(0); 75 this.conn.connect(hostname, port); 76 77 // Send IP discovery payload 78 OutBuffer b = new OutBuffer(); 79 b.write(nativeToBigEndian(this.vc.ssrc)); 80 b.fill0(70 - b.toBytes.length); 81 this.conn.send(b.toBytes); 82 83 // Wait for the IP discovery response, maybe timeout after a bit 84 string data; 85 try { 86 data = cast(string)this.conn.recv(timeout); 87 } catch (Exception e) { 88 return false; 89 } 90 91 // Parse the IP discovery response 92 this.ip = data[4..(data[4..data.length].indexOf(0x00) + 4)]; 93 ubyte[2] portBytes = cast(ubyte[])(data)[data.length - 2..data.length]; 94 this.port = littleEndianToNative!(ushort, 2)(portBytes); 95 this.vc.log.tracef("voice hoststring is %s:%s", ip, port); 96 97 // Finally actually start running the task 98 runTask(toDelegate(&this.run)); 99 return true; 100 } 101 102 /+ 103 void playDCA(DCAFile obj) { 104 foreach (frame; obj.frames) { 105 RTPHeader header; 106 header.seq = this.seq++; 107 header.ts = (this.ts += frame.size); 108 header.ssrc = this.vc.ssrc; 109 this.vc.log.tracef("s %s, t %s, ss %s", header.seq, header.ts, header.ssrc); 110 ubyte[] raw = header.pack() ~ frame.data; 111 this.vc.log.tracef("sending frame (%s + %s)", header.pack().length, frame.data.length); 112 this.conn.send(raw); 113 sleep((1.seconds / 1000) * 30); 114 } 115 } 116 +/ 117 } 118 119 class VoiceClient { 120 // Global client 121 Client client; 122 123 // Voice channel we're for 124 Channel channel; 125 126 // Packet emitter 127 Emitter packetEmitter; 128 129 // UDP Client 130 UDPVoiceClient udp; 131 132 private { 133 Logger log; 134 TaskMutex waitForConnectedMutex; 135 TaskCondition waitForConnected; 136 137 // Voice websocket 138 WebSocket sock; 139 140 // Heartbeater task 141 Task heartbeater; 142 143 // Listener for VOICE_SERVER_UPDATE events 144 EventListener l; 145 } 146 147 // Various connection attributes 148 string token; 149 URL endpoint; 150 bool connected = false; 151 ushort ssrc; 152 ushort port; 153 ushort heartbeat_interval; 154 bool mute; 155 bool deaf; 156 bool speaking = false; 157 158 this(Channel c, bool mute=false, bool deaf=false) { 159 this.channel = c; 160 this.client = c.client; 161 this.log = this.client.log; 162 this.mute = mute; 163 this.deaf = deaf; 164 165 this.packetEmitter = new Emitter; 166 this.packetEmitter.listen!VoiceReadyPacket(toDelegate(&this.handleVoiceReadyPacket)); 167 this.packetEmitter.listen!VoiceSessionDescriptionPacket( 168 toDelegate(&this.handleVoiceSessionDescription)); 169 } 170 171 void setSpeaking(bool value) { 172 if (this.speaking == value) return; 173 174 this.speaking = value; 175 this.send(new VoiceSpeakingPacket(value, 0)); 176 } 177 178 void handleVoiceReadyPacket(VoiceReadyPacket p) { 179 this.log.tracef("Got VoiceReadyPacket"); 180 this.ssrc = p.ssrc; 181 this.port = p.port; 182 this.heartbeat_interval = p.heartbeat_interval; 183 184 // Spawn the heartbeater 185 this.heartbeater = runTask(toDelegate(&this.heartbeat)); 186 187 // Open up the UDP Connection and perform IP discovery 188 this.udp = new UDPVoiceClient(this); 189 assert(this.udp.connect(this.endpoint.host, this.port), "Failed to UDPVoiceClient connect/discover"); 190 191 // Select the protocol 192 this.send(new VoiceSelectProtocolPacket("udp", "plain", this.udp.ip, this.udp.port)); 193 194 } 195 196 void handleVoiceSessionDescription(VoiceSessionDescriptionPacket p) { 197 // Notify the waitForConnected condition 198 this.waitForConnected.notifyAll(); 199 } 200 201 /+ 202 void playDCAFile(DCAFile f) { 203 this.udp.playDCA(f); 204 } 205 +/ 206 207 void heartbeat() { 208 while (this.connected) { 209 uint unixTime = cast(uint)core.stdc.time.time(null); 210 this.send(new VoiceHeartbeatPacket(unixTime * 1000)); 211 sleep(this.heartbeat_interval.msecs); 212 } 213 } 214 215 /* 216 void dispatch(JSONObject obj) { 217 this.log.tracef("voice-dispatch: %s %s", obj.get!VoiceOPCode("op"), obj.dumps); 218 219 switch (obj.get!VoiceOPCode("op")) { 220 case VoiceOPCode.VOICE_READY: 221 this.packetEmitter.emit!VoiceReadyPacket(new VoiceReadyPacket(obj)); 222 break; 223 case VoiceOPCode.VOICE_SESSION_DESCRIPTION: 224 this.packetEmitter.emit!VoiceSessionDescriptionPacket( 225 new VoiceSessionDescriptionPacket(obj)); 226 break; 227 default: 228 break; 229 } 230 } 231 */ 232 233 void send(Serializable p) { 234 JSONValue data = p.serialize(); 235 this.log.tracef("voice-send: %s", data.toString); 236 this.sock.send(data.toString); 237 } 238 239 void run() { 240 string data; 241 242 while (this.sock.waitForData()) { 243 // Not possible to recv compressed data on the voice ws right now, but lets future guard 244 try { 245 ubyte[] rawdata = this.sock.receiveBinary(); 246 data = cast(string)uncompress(rawdata); 247 } catch (Exception e) { 248 data = this.sock.receiveText(); 249 } 250 251 if (data == "") { 252 continue; 253 } 254 255 try { 256 // this.dispatch(new JSONObject(data)); 257 } catch (Exception e) { 258 this.log.warning("failed to handle voice dispatch: %s (%s)", e, data); 259 } 260 } 261 262 this.log.warning("voice websocket closed"); 263 } 264 265 /* 266 void onVoiceServerUpdate(VoiceServerUpdate event) { 267 if (this.channel.guild_id != event.guild_id) { 268 return; 269 } 270 271 // TODO: handle server moving 272 this.token = event.token; 273 this.connected = true; 274 275 // Grab endpoint and create a proper URL out of it 276 this.endpoint = URL("ws", event.endpoint.split(":")[0], 0, Path()); 277 this.sock = connectWebSocket(this.endpoint); 278 runTask(toDelegate(&this.run)); 279 280 // Send identify 281 this.send(new VoiceIdentifyPacket( 282 this.channel.guild_id, 283 this.client.state.me.id, 284 this.client.gw.session_id, 285 this.token 286 )); 287 } 288 */ 289 290 bool connect(Duration timeout=5.seconds) { 291 this.waitForConnectedMutex = new TaskMutex; 292 this.waitForConnected = new TaskCondition(this.waitForConnectedMutex); 293 294 //this.l = this.client.gw.eventEmitter.listen!VoiceServerUpdate(toDelegate( 295 // &this.onVoiceServerUpdate)); 296 297 this.client.gw.send(new VoiceStateUpdatePacket( 298 this.channel.guild.id, 299 this.channel.id, 300 this.mute, 301 this.deaf 302 )); 303 304 // Wait for connection 305 synchronized (this.waitForConnectedMutex) { 306 if (this.waitForConnected.wait(timeout)) { 307 return true; 308 } else { 309 this.disconnect(); 310 return false; 311 } 312 } 313 } 314 315 void disconnect() { 316 this.connected = false; 317 this.sock.close(); 318 this.l.unbind(); 319 this.client.gw.send(new VoiceStateUpdatePacket( 320 this.channel.guild.id, 321 0, // TODO 322 this.mute, 323 this.deaf 324 )); 325 } 326 }