1 module dscord.voice.client;
2 
3 import core.time,
4        core.stdc.time,
5        std.stdio,
6        std.zlib,
7        std.functional,
8        std.array,
9        std.stdio,
10        std.bitmanip,
11        std.outbuffer,
12        std.string;
13 
14 import vibe.core.core,
15        vibe.core.net,
16        vibe.inet.url,
17        vibe.http.websockets;
18 
19 // import dcad.types : DCAFile;
20 
21 import dscord.client,
22        dscord.gateway.packets,
23        dscord.gateway.events,
24        dscord.voice.packets,
25        dscord.types.all,
26        dscord.util.emitter;
27 
28 struct RTPHeader {
29   ushort  seq;
30   uint    ts;
31   uint    ssrc;
32 
33   this(ushort seq, uint ts, uint ssrc) {
34     this.seq = seq;
35     this.ts = ts;
36     this.ssrc = ssrc;
37   }
38 
39   ubyte[] pack() {
40     OutBuffer b = new OutBuffer();
41     b.write('\x80');
42     b.write('\x78');
43     b.write(nativeToBigEndian(this.seq));
44     b.write(nativeToBigEndian(this.ts));
45     b.write(nativeToBigEndian(this.ssrc));
46     return b.toBytes;
47   }
48 }
49 
50 class UDPVoiceClient {
51   VoiceClient    vc;
52   UDPConnection  conn;
53 
54   // Local connection info
55   string  ip;
56   ushort  port;
57 
58   // Voice audio info
59   ushort  seq;
60   uint    ts;
61 
62   this(VoiceClient vc) {
63     this.vc = vc;
64   }
65 
66   void run() {
67     while (true) {
68       auto data = this.conn.recv();
69       // this.vc.log.infof("got data %s", cast(string)data);
70     }
71   }
72 
73   bool connect(string hostname, ushort port, Duration timeout=5.seconds) {
74     this.conn = listenUDP(0);
75     this.conn.connect(hostname, port);
76 
77     // Send IP discovery payload
78     OutBuffer b = new OutBuffer();
79     b.write(nativeToBigEndian(this.vc.ssrc));
80     b.fill0(70 - b.toBytes.length);
81     this.conn.send(b.toBytes);
82 
83     // Wait for the IP discovery response, maybe timeout after a bit
84     string data;
85     try {
86       data = cast(string)this.conn.recv(timeout);
87     } catch (Exception e) {
88       return false;
89     }
90 
91     // Parse the IP discovery response
92     this.ip = data[4..(data[4..data.length].indexOf(0x00) + 4)];
93     ubyte[2] portBytes = cast(ubyte[])(data)[data.length - 2..data.length];
94     this.port = littleEndianToNative!(ushort, 2)(portBytes);
95     this.vc.log.tracef("voice hoststring is %s:%s", ip, port);
96 
97     // Finally actually start running the task
98     runTask(toDelegate(&this.run));
99     return true;
100   }
101 
102   /+
103   void playDCA(DCAFile obj) {
104     foreach (frame; obj.frames) {
105       RTPHeader header;
106       header.seq = this.seq++;
107       header.ts = (this.ts += frame.size);
108       header.ssrc = this.vc.ssrc;
109       this.vc.log.tracef("s %s, t %s, ss %s", header.seq, header.ts, header.ssrc);
110       ubyte[] raw = header.pack() ~ frame.data;
111       this.vc.log.tracef("sending frame (%s + %s)", header.pack().length, frame.data.length);
112       this.conn.send(raw);
113       sleep((1.seconds / 1000) * 30);
114     }
115   }
116   +/
117 }
118 
119 class VoiceClient {
120   // Global client
121   Client     client;
122 
123   // Voice channel we're for
124   Channel    channel;
125 
126   // Packet emitter
127   Emitter  packetEmitter;
128 
129   // UDP Client
130   UDPVoiceClient  udp;
131 
132   private {
133     Logger     log;
134     TaskMutex      waitForConnectedMutex;
135     TaskCondition  waitForConnected;
136 
137     // Voice websocket
138     WebSocket  sock;
139 
140     // Heartbeater task
141     Task  heartbeater;
142 
143     // Listener for VOICE_SERVER_UPDATE events
144     EventListener  l;
145   }
146 
147   // Various connection attributes
148   string  token;
149   URL     endpoint;
150   bool    connected = false;
151   ushort  ssrc;
152   ushort  port;
153   ushort  heartbeat_interval;
154   bool    mute;
155   bool    deaf;
156   bool    speaking = false;
157 
158   this(Channel c, bool mute=false, bool deaf=false) {
159     this.channel = c;
160     this.client = c.client;
161     this.log = this.client.log;
162     this.mute = mute;
163     this.deaf = deaf;
164 
165     this.packetEmitter = new Emitter;
166     this.packetEmitter.listen!VoiceReadyPacket(toDelegate(&this.handleVoiceReadyPacket));
167     this.packetEmitter.listen!VoiceSessionDescriptionPacket(
168         toDelegate(&this.handleVoiceSessionDescription));
169   }
170 
171   void setSpeaking(bool value) {
172     if (this.speaking == value) return;
173 
174     this.speaking = value;
175     this.send(new VoiceSpeakingPacket(value, 0));
176   }
177 
178   void handleVoiceReadyPacket(VoiceReadyPacket p) {
179     this.log.tracef("Got VoiceReadyPacket");
180     this.ssrc = p.ssrc;
181     this.port = p.port;
182     this.heartbeat_interval = p.heartbeat_interval;
183 
184     // Spawn the heartbeater
185     this.heartbeater = runTask(toDelegate(&this.heartbeat));
186 
187     // Open up the UDP Connection and perform IP discovery
188     this.udp = new UDPVoiceClient(this);
189     assert(this.udp.connect(this.endpoint.host, this.port), "Failed to UDPVoiceClient connect/discover");
190 
191     // Select the protocol
192     this.send(new VoiceSelectProtocolPacket("udp", "plain", this.udp.ip, this.udp.port));
193 
194   }
195 
196   void handleVoiceSessionDescription(VoiceSessionDescriptionPacket p) {
197     // Notify the waitForConnected condition
198     this.waitForConnected.notifyAll();
199   }
200 
201   /+
202   void playDCAFile(DCAFile f) {
203     this.udp.playDCA(f);
204   }
205   +/
206 
207   void heartbeat() {
208     while (this.connected) {
209       uint unixTime = cast(uint)core.stdc.time.time(null);
210       this.send(new VoiceHeartbeatPacket(unixTime * 1000));
211       sleep(this.heartbeat_interval.msecs);
212     }
213   }
214 
215   /*
216   void dispatch(JSONObject obj) {
217     this.log.tracef("voice-dispatch: %s %s", obj.get!VoiceOPCode("op"), obj.dumps);
218 
219     switch (obj.get!VoiceOPCode("op")) {
220       case VoiceOPCode.VOICE_READY:
221         this.packetEmitter.emit!VoiceReadyPacket(new VoiceReadyPacket(obj));
222         break;
223       case VoiceOPCode.VOICE_SESSION_DESCRIPTION:
224         this.packetEmitter.emit!VoiceSessionDescriptionPacket(
225             new VoiceSessionDescriptionPacket(obj));
226         break;
227       default:
228         break;
229     }
230   }
231   */
232 
233   void send(Serializable p) {
234     JSONValue data = p.serialize();
235     this.log.tracef("voice-send: %s", data.toString);
236     this.sock.send(data.toString);
237   }
238 
239   void run() {
240     string data;
241 
242     while (this.sock.waitForData()) {
243       // Not possible to recv compressed data on the voice ws right now, but lets future guard
244       try {
245         ubyte[] rawdata = this.sock.receiveBinary();
246         data = cast(string)uncompress(rawdata);
247       } catch (Exception e) {
248         data = this.sock.receiveText();
249       }
250 
251       if (data == "") {
252         continue;
253       }
254 
255       try {
256         // this.dispatch(new JSONObject(data));
257       } catch (Exception e) {
258         this.log.warning("failed to handle voice dispatch: %s (%s)", e, data);
259       }
260     }
261 
262     this.log.warning("voice websocket closed");
263   }
264 
265   /*
266   void onVoiceServerUpdate(VoiceServerUpdate event) {
267     if (this.channel.guild_id != event.guild_id) {
268       return;
269     }
270 
271     // TODO: handle server moving
272     this.token = event.token;
273     this.connected = true;
274 
275     // Grab endpoint and create a proper URL out of it
276     this.endpoint = URL("ws", event.endpoint.split(":")[0], 0, Path());
277     this.sock = connectWebSocket(this.endpoint);
278     runTask(toDelegate(&this.run));
279 
280     // Send identify
281     this.send(new VoiceIdentifyPacket(
282       this.channel.guild_id,
283       this.client.state.me.id,
284       this.client.gw.session_id,
285       this.token
286     ));
287   }
288   */
289 
290   bool connect(Duration timeout=5.seconds) {
291     this.waitForConnectedMutex = new TaskMutex;
292     this.waitForConnected = new TaskCondition(this.waitForConnectedMutex);
293 
294     //this.l = this.client.gw.eventEmitter.listen!VoiceServerUpdate(toDelegate(
295     //  &this.onVoiceServerUpdate));
296 
297     this.client.gw.send(new VoiceStateUpdatePacket(
298       this.channel.guild.id,
299       this.channel.id,
300       this.mute,
301       this.deaf
302    ));
303 
304     // Wait for connection
305     synchronized (this.waitForConnectedMutex) {
306       if (this.waitForConnected.wait(timeout)) {
307         return true;
308       } else {
309         this.disconnect();
310         return false;
311       }
312     }
313   }
314 
315   void disconnect() {
316     this.connected = false;
317     this.sock.close();
318     this.l.unbind();
319     this.client.gw.send(new VoiceStateUpdatePacket(
320       this.channel.guild.id,
321       0, // TODO
322       this.mute,
323       this.deaf
324     ));
325   }
326 }