diff --git a/Cargo.toml b/Cargo.toml index 7654313..972d9ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tauri-video03" -version = "0.1.1" +version = "0.2.1" description = "A Tauri Video App" authors = ["sinus@sasedev.net"] edition = "2024" @@ -21,6 +21,7 @@ tauri-build = { version = "2", features = [] } base64 = { version = "^0.22", features = [] } chrono = { version = "^0.4", features = ["clock", "serde"] } dirs = { version = "^6.0", features = [] } +cpal = { version = "^0.17", features = [] } futures-util = { version = "^0.3", features = [] } gstreamer = { version = "^0.25", features = ["serde"] } gstreamer-app = { version = "^0.25", features = [] } diff --git a/frontend/chat.ts b/frontend/chat.ts deleted file mode 100644 index 9d148ed..0000000 --- a/frontend/chat.ts +++ /dev/null @@ -1,198 +0,0 @@ -import WebSocket from "@tauri-apps/plugin-websocket"; -import { PeerInfo } from './ts/bindings/PeerInfo'; -import { ClientWsMessage } from './ts/bindings/ClientWsMessage'; -import { ServerWsMessage } from './ts/bindings/ServerWsMessage'; - -type RemoveListener = (() => void) | null; - -export class LocalSignallingClient { - private socket: WebSocket | null = null; - private removeListener: RemoveListener = null; - private myPeerId: string | null = null; - private peers: PeerInfo[] = []; - - constructor( - private readonly displayName: string, - private readonly onLog: (message: string) => void, - private readonly onPeers: (peers: PeerInfo[]) => void, - private readonly onChat: (line: string) => void, - ) {} - - public async connect(): Promise { - if (this.socket !== null) { - this.onLog("signalling already connected"); - return; - } - - const url = "ws://127.0.0.1:3012"; - - this.onLog(`connecting to ${url}`); - - let socket: WebSocket; - try { - socket = await WebSocket.connect(url); - } catch (error) { - throw new Error(`websocket connection failed: ${String(error)}`); - } - - this.socket = socket; - - const removeListener = socket.addListener((message) => { - if (message.type === "Text") { - this.handleServerMessage(message.data); - return; - } - - if (message.type === "Close") { - this.onLog("signalling disconnected"); - this.cleanupDisconnectedState(); - return; - } - - if (message.type === "Binary") { - this.onLog("unexpected binary websocket message"); - return; - } - }); - - this.removeListener = removeListener; - - this.onLog("signalling connected"); - - await this.send({ - type: "hello", - display_name: this.displayName, - }); - } - - public async disconnect(): Promise { - if (this.socket === null) { - return; - } - - const socket = this.socket; - - if (this.removeListener !== null) { - this.removeListener(); - this.removeListener = null; - } - - this.socket = null; - - try { - await socket.disconnect(); - } catch (error) { - this.onLog(`disconnect error: ${String(error)}`); - } - - this.cleanupDisconnectedState(); - } - - public async sendChat(text: string): Promise { - await this.send({ - type: "chat_send", - text, - }); - } - - public getMyPeerId(): string | null { - return this.myPeerId; - } - - public getPeers(): PeerInfo[] { - return [...this.peers]; - } - - private async send(message: ClientWsMessage): Promise { - if (this.socket === null) { - this.onLog("cannot send: signalling not connected"); - return; - } - - const payload = JSON.stringify(message); - - try { - await this.socket.send(payload); - } catch (error) { - this.onLog(`send failed: ${String(error)}`); - } - } - - private cleanupDisconnectedState(): void { - this.socket = null; - this.myPeerId = null; - this.peers = []; - this.onPeers([]); - } - - private handleServerMessage(raw: string): void { - let message: ServerWsMessage; - - try { - message = JSON.parse(raw) as ServerWsMessage; - } catch (error) { - this.onLog(`invalid server message: ${String(error)}`); - return; - } - - switch (message.type) { - case "welcome": - this.myPeerId = message.peer_id; - this.onLog(`welcome peer_id=${message.peer_id}`); - break; - - case "peer_list": - this.peers = message.peers; - this.onPeers([...this.peers]); - this.onLog(`peer_list received (${message.peers.length})`); - break; - - case "peer_joined": - this.peers = mergePeer(this.peers, message.peer); - this.onPeers([...this.peers]); - this.onLog(`peer joined: ${message.peer.display_name}`); - break; - - case "peer_left": - this.peers = this.peers.filter((peer) => peer.peer_id !== message.peer_id); - this.onPeers([...this.peers]); - this.onLog(`peer left: ${message.peer_id}`); - break; - - case "chat_receive": - this.onChat(`${message.from_display_name}: ${message.text}`); - break; - - case "offer": - this.onLog(`offer received from ${message.from_peer_id}`); - break; - - case "answer": - this.onLog(`answer received from ${message.from_peer_id}`); - break; - - case "ice_candidate": - this.onLog(`ice candidate received from ${message.from_peer_id}`); - break; - - case "pong": - this.onLog("pong"); - break; - - case "error": - this.onLog(`server error: ${message.message}`); - break; - - default: - this.onLog(`unhandled server message: ${raw}`); - break; - } - } -} - -function mergePeer(peers: PeerInfo[], incoming: PeerInfo): PeerInfo[] { - const next = peers.filter((peer) => peer.peer_id !== incoming.peer_id); - next.push(incoming); - next.sort((a, b) => a.display_name.localeCompare(b.display_name)); - return next; -} diff --git a/frontend/index.html b/frontend/index.html index 61aa395..01adb06 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -5,62 +5,28 @@ + content="default-src 'self'; + script-src 'self'; + style-src 'self' 'unsafe-inline'; + img-src 'self' data: blob:; + connect-src * ipc: http://ipc.localhost ipc://localhost;" /> Tauri GST Signalling + Chat
-

POC 3 - Chat

+

POC 3 - Rust signalling + Rust WebRTC

-

Signalling + Chat

+

Audio

- - - + +
-
-
-

Peers

-
[]
-
- -
-

Logs

-
Ready.
-
-
- -
- - -
- -
No messages yet.
-
- -
-

Video Preview

- -
- Video preview -
-
- -
-

Audio + Video

- -
- - -
- -
Ready.
+
Ready.
@@ -76,15 +42,81 @@
-

Audio

+

Audio + Video

- - + +
-
Ready.
+
Ready.
+
+ +
+

Video Preview

+ +
+ Video preview +
+
+ +
+

Signalling + Chat

+ +
+ + + +
+ +
+
+

Peers

+
[]
+
+ +
+

Logs

+
Ready.
+
+
+ +
+ + +
+ +
No messages yet.
+
+ +
+

Rust WebRTC DataChannel

+ +
+ + + +
+ +
Idle.
+ +
+ + +
+ +
No RTC messages yet.
diff --git a/frontend/main.ts b/frontend/main.ts index 4913b1b..bb1429c 100644 --- a/frontend/main.ts +++ b/frontend/main.ts @@ -1,9 +1,9 @@ +// file: frontend/main.ts import { invoke } from "@tauri-apps/api/core"; import { AvStartResponse } from './ts/bindings/AvStartResponse'; import { AvStopResponse } from './ts/bindings/AvStopResponse'; -import { LocalSignallingClient } from "./chat"; -import { PeerInfo } from './ts/bindings/PeerInfo'; +import { RtcSnapshot } from './ts/bindings/RtcSnapshot'; type Mode = "idle" | "audio" | "video" | "av"; @@ -21,77 +21,83 @@ const avStatus = document.querySelector("#av-status"); const previewElement = document.querySelector("#video-preview"); -const chatDisplayNameInput = document.querySelector("#chat-display-name"); -const chatConnectBtn = document.querySelector("#chat-connect-btn"); -const chatDisconnectBtn = document.querySelector("#chat-disconnect-btn"); -const chatInput = document.querySelector("#chat-input"); -const chatSendBtn = document.querySelector("#chat-send-btn"); -const chatPeers = document.querySelector("#chat-peers"); -const chatLogs = document.querySelector("#chat-logs"); -const chatMessages = document.querySelector("#chat-messages"); +const rtcServerUrlInput = document.querySelector("#rtc-server-url"); +const rtcDisplayNameInput = document.querySelector("#rtc-display-name"); +const rtcConnectBtn = document.querySelector("#rtc-connect-btn"); +const rtcDisconnectBtn = document.querySelector("#rtc-disconnect-btn"); -if ( - startAudioBtn === null || - stopAudioBtn === null || - audioStatus === null || - startVideoBtn === null || - stopVideoBtn === null || - videoStatus === null || - startAvBtn === null || - stopAvBtn === null || - avStatus === null || - previewElement === null || - chatDisplayNameInput === null || - chatConnectBtn === null || - chatDisconnectBtn === null || - chatInput === null || - chatSendBtn === null || - chatPeers === null || - chatLogs === null || - chatMessages === null -) { - throw new Error("missing UI elements"); -} +const rtcPeers = document.querySelector("#rtc-peers"); +const rtcLogs = document.querySelector("#rtc-logs"); + +const rtcChatInput = document.querySelector("#rtc-chat-input"); +const rtcChatSendBtn = document.querySelector("#rtc-chat-send-btn"); +const rtcChatMessages = document.querySelector("#rtc-chat-messages"); + +const rtcTargetPeerIdInput = document.querySelector("#rtc-target-peer-id"); +const rtcStartOfferBtn = document.querySelector("#rtc-start-offer-btn"); +const rtcClosePeerBtn = document.querySelector("#rtc-close-peer-btn"); +const rtcStatus = document.querySelector("#rtc-status"); + +const rtcDirectMessageInput = document.querySelector("#rtc-direct-message-input"); +const rtcDirectSendBtn = document.querySelector("#rtc-direct-send-btn"); +const rtcDirectMessages = document.querySelector("#rtc-direct-messages"); let currentMode: Mode = "idle"; + let previewTimer: number | null = null; let previewRequestInFlight = false; let previewObjectUrl: string | null = null; +let rtcSnapshotTimer: number | null = null; +let rtcSnapshotRequestInFlight = false; +let rtcPollingPaused = false; + function setAudioStatus(message: string): void { - if (audioStatus) + if (audioStatus) { audioStatus.textContent = message; + } } function setVideoStatus(message: string): void { - if (videoStatus) + if (videoStatus) { videoStatus.textContent = message; + } } function setAvStatus(message: string): void { - if (avStatus) + if (avStatus) { avStatus.textContent = message; + } } function setAudioButtons(isRecording: boolean): void { - if (startAudioBtn) + if (startAudioBtn) { startAudioBtn.disabled = isRecording; - if (stopAudioBtn) + } + + if (stopAudioBtn) { stopAudioBtn.disabled = !isRecording; + } } function setVideoButtons(isRecording: boolean): void { - if (startVideoBtn) + if (startVideoBtn) { startVideoBtn.disabled = isRecording; - if (stopVideoBtn) + } + + if (stopVideoBtn) { stopVideoBtn.disabled = !isRecording; + } } function setAvButtons(isRecording: boolean): void { - if (startAvBtn) + if (startAvBtn) { startAvBtn.disabled = isRecording; - if (stopAvBtn) + } + + if (stopAvBtn) { stopAvBtn.disabled = !isRecording; + } } function setAllButtonsForMode(mode: Mode): void { @@ -107,24 +113,30 @@ function setAllButtonsForMode(mode: Mode): void { setAvButtons(mode === "av"); if (mode !== "audio") { - if (stopAudioBtn) + if (stopAudioBtn) { stopAudioBtn.disabled = true; - if (startAudioBtn) + } + if (startAudioBtn) { startAudioBtn.disabled = true; + } } if (mode !== "video") { - if (stopVideoBtn) + if (stopVideoBtn) { stopVideoBtn.disabled = true; - if (startVideoBtn) + } + if (startVideoBtn) { startVideoBtn.disabled = true; + } } if (mode !== "av") { - if (stopAvBtn) + if (stopAvBtn) { stopAvBtn.disabled = true; - if (startAvBtn) + } + if (startAvBtn) { startAvBtn.disabled = true; + } } } @@ -133,6 +145,40 @@ function setCurrentMode(mode: Mode): void { setAllButtonsForMode(mode); } +function setRtcConnected( + signallingConnected: boolean, + dataChannelOpen: boolean, + activeRemotePeerId: string | null, +): void { + const rtcBusy = dataChannelOpen || activeRemotePeerId !== null; + + if (rtcConnectBtn) { + rtcConnectBtn.disabled = signallingConnected; + } + + if (rtcDisconnectBtn) { + rtcDisconnectBtn.disabled = !signallingConnected; + } + + if (rtcChatSendBtn) { + rtcChatSendBtn.disabled = !signallingConnected; + } + + if (rtcStartOfferBtn) { + rtcStartOfferBtn.disabled = !signallingConnected || rtcBusy; + } +} + +function setRtcDataChannelOpen(open: boolean): void { + if (rtcDirectSendBtn) { + rtcDirectSendBtn.disabled = !open; + } + + if (rtcClosePeerBtn) { + rtcClosePeerBtn.disabled = !open; + } +} + function base64ToUint8Array(base64: string): Uint8Array { const binary = window.atob(base64); const bytes = new Uint8Array(binary.length); @@ -147,9 +193,12 @@ function base64ToUint8Array(base64: string): Uint8Array { } function updatePreviewImageFromBase64(encoded: string): void { + if (!previewElement) { + return; + } + const bytes = base64ToUint8Array(encoded); const copied = new Uint8Array(bytes.byteLength); - copied.set(bytes); const blob = new Blob([copied.buffer], { type: "image/jpeg" }); @@ -160,8 +209,7 @@ function updatePreviewImageFromBase64(encoded: string): void { } previewObjectUrl = objectUrl; - if (previewElement) - previewElement.src = objectUrl; + previewElement.src = objectUrl; } async function refreshPreviewFrame(): Promise { @@ -215,213 +263,376 @@ function stopPreviewPolling(): void { previewObjectUrl = null; } - if (previewElement) + if (previewElement) { previewElement.removeAttribute("src"); + } } -startAudioBtn.addEventListener("click", async () => { - if (currentMode !== "idle") { - setAudioStatus("Another mode is already running."); +function renderRtcSnapshot(snapshot: RtcSnapshot): void { + const peersText = JSON.stringify(snapshot.peers, null, 2); + if (rtcPeers && rtcPeers.textContent !== peersText) { + rtcPeers.textContent = peersText; + } + + const logsText = snapshot.logs.join("\n"); + if (rtcLogs && rtcLogs.textContent !== logsText) { + rtcLogs.textContent = logsText; + } + + const chatMessagesText = snapshot.chat_messages.join("\n"); + if (rtcChatMessages && rtcChatMessages.textContent !== chatMessagesText) { + rtcChatMessages.textContent = chatMessagesText; + } + + const directMessagesText = snapshot.rtc_messages.join("\n"); + if (rtcDirectMessages && rtcDirectMessages.textContent !== directMessagesText) { + rtcDirectMessages.textContent = directMessagesText; + } + + if (rtcStatus && rtcStatus.textContent !== snapshot.rtc_status) { + rtcStatus.textContent = snapshot.rtc_status; + } + + if ( + rtcTargetPeerIdInput && + snapshot.active_remote_peer_id !== null && + document.activeElement !== rtcTargetPeerIdInput && + rtcTargetPeerIdInput.value !== snapshot.active_remote_peer_id + ) { + rtcTargetPeerIdInput.value = snapshot.active_remote_peer_id; + } + + if ( + rtcServerUrlInput && + snapshot.server_url.length > 0 && + document.activeElement !== rtcServerUrlInput && + rtcServerUrlInput.value !== snapshot.server_url + ) { + rtcServerUrlInput.value = snapshot.server_url; + } + + if ( + rtcDisplayNameInput && + snapshot.display_name.length > 0 && + document.activeElement !== rtcDisplayNameInput && + rtcDisplayNameInput.value !== snapshot.display_name + ) { + rtcDisplayNameInput.value = snapshot.display_name; + } + + setRtcConnected( + snapshot.signalling_connected, + snapshot.data_channel_open, + snapshot.active_remote_peer_id, + ); +} + +async function refreshRtcSnapshot(): Promise { + if (rtcSnapshotRequestInFlight) { return; } - setCurrentMode("audio"); - - try { - const path = await invoke("start_audio_recording"); - setAudioStatus(`Audio recording started.\nOutput: ${path}`); - } catch (error) { - setAudioStatus(`Start audio failed.\n${String(error)}`); - setCurrentMode("idle"); - } -}); - -stopAudioBtn.addEventListener("click", async () => { - if (currentMode !== "audio") { - setAudioStatus("Audio mode is not running."); + if (rtcPollingPaused) { return; } - stopAudioBtn.disabled = true; + rtcSnapshotRequestInFlight = true; try { - const path = await invoke("stop_audio_recording"); - setAudioStatus(`Audio recording stopped.\nSaved file: ${path}`); - setCurrentMode("idle"); + const snapshot = await invoke("rtc_get_snapshot"); + renderRtcSnapshot(snapshot); } catch (error) { - setAudioStatus(`Stop audio failed.\n${String(error)}`); + console.error("rtc snapshot refresh failed", error); + } finally { + rtcSnapshotRequestInFlight = false; + } +} + +async function refreshRtcSnapshotForced(): Promise { + if (rtcSnapshotRequestInFlight) { + return; + } + + rtcSnapshotRequestInFlight = true; + + try { + const snapshot = await invoke("rtc_get_snapshot"); + renderRtcSnapshot(snapshot); + } catch (error) { + console.error("rtc snapshot forced refresh failed", error); + } finally { + rtcSnapshotRequestInFlight = false; + } +} + +function startRtcSnapshotPolling(): void { + if (rtcSnapshotTimer !== null) { + return; + } + + window.setTimeout(() => { + void refreshRtcSnapshot(); + }, 100); + + rtcSnapshotTimer = window.setInterval(() => { + void refreshRtcSnapshot(); + }, 1000); +} + +if (startAudioBtn) { + startAudioBtn.addEventListener("click", async () => { + if (currentMode !== "idle") { + setAudioStatus("Another mode is already running."); + return; + } + setCurrentMode("audio"); - } -}); -startVideoBtn.addEventListener("click", async () => { - if (currentMode !== "idle") { - setVideoStatus("Another mode is already running."); - return; - } + try { + const path = await invoke("start_audio_recording"); + setAudioStatus(`Audio recording started.\nOutput: ${path}`); + } catch (error) { + setAudioStatus(`Start audio failed.\n${String(error)}`); + setCurrentMode("idle"); + } + }); +} - setCurrentMode("video"); +if (stopAudioBtn) { + stopAudioBtn.addEventListener("click", async () => { + if (currentMode !== "audio") { + setAudioStatus("Audio mode is not running."); + return; + } - try { - const path = await invoke("start_video_recording"); - setVideoStatus(`Video recording started.\nOutput: ${path}`); - startPreviewPolling(); - } catch (error) { - setVideoStatus(`Start video failed.\n${String(error)}`); - stopPreviewPolling(); - setCurrentMode("idle"); - } -}); + stopAudioBtn.disabled = true; -stopVideoBtn.addEventListener("click", async () => { - if (currentMode !== "video") { - setVideoStatus("Video mode is not running."); - return; - } + try { + const path = await invoke("stop_audio_recording"); + setAudioStatus(`Audio recording stopped.\nSaved file: ${path}`); + setCurrentMode("idle"); + } catch (error) { + setAudioStatus(`Stop audio failed.\n${String(error)}`); + setCurrentMode("audio"); + } + }); +} - stopVideoBtn.disabled = true; +if (startVideoBtn) { + startVideoBtn.addEventListener("click", async () => { + if (currentMode !== "idle") { + setVideoStatus("Another mode is already running."); + return; + } - try { - const path = await invoke("stop_video_recording"); - setVideoStatus(`Video recording stopped.\nSaved file: ${path}`); - stopPreviewPolling(); - setCurrentMode("idle"); - } catch (error) { - setVideoStatus(`Stop video failed.\n${String(error)}`); setCurrentMode("video"); - } -}); -startAvBtn.addEventListener("click", async () => { - if (currentMode !== "idle") { - setAvStatus("Another mode is already running."); - return; - } + try { + const path = await invoke("start_video_recording"); + setVideoStatus(`Video recording started.\nOutput: ${path}`); + startPreviewPolling(); + } catch (error) { + setVideoStatus(`Start video failed.\n${String(error)}`); + stopPreviewPolling(); + setCurrentMode("idle"); + } + }); +} - setCurrentMode("av"); +if (stopVideoBtn) { + stopVideoBtn.addEventListener("click", async () => { + if (currentMode !== "video") { + setVideoStatus("Video mode is not running."); + return; + } - try { - const result = await invoke("start_av_recording"); + stopVideoBtn.disabled = true; - setAvStatus( - `AV recording started.\nAudio: ${result.audio_path}\nVideo: ${result.video_path}` - ); + try { + const path = await invoke("stop_video_recording"); + setVideoStatus(`Video recording stopped.\nSaved file: ${path}`); + stopPreviewPolling(); + setCurrentMode("idle"); + } catch (error) { + setVideoStatus(`Stop video failed.\n${String(error)}`); + setCurrentMode("video"); + } + }); +} - startPreviewPolling(); - } catch (error) { - setAvStatus(`Start AV failed.\n${String(error)}`); - stopPreviewPolling(); - setCurrentMode("idle"); - } -}); +if (startAvBtn) { + startAvBtn.addEventListener("click", async () => { + if (currentMode !== "idle") { + setAvStatus("Another mode is already running."); + return; + } -stopAvBtn.addEventListener("click", async () => { - if (currentMode !== "av") { - setAvStatus("AV mode is not running."); - return; - } - - stopAvBtn.disabled = true; - - try { - const result = await invoke("stop_av_recording"); - - setAvStatus( - `AV recording stopped.\nAudio: ${result.audio_path}\nVideo: ${result.video_path}` - ); - - stopPreviewPolling(); - setCurrentMode("idle"); - } catch (error) { - setAvStatus(`Stop AV failed.\n${String(error)}`); setCurrentMode("av"); - } -}); + + try { + const result = await invoke("start_av_recording"); + setAvStatus( + `AV recording started.\nAudio: ${result.audio_path}\nVideo: ${result.video_path}` + ); + startPreviewPolling(); + } catch (error) { + setAvStatus(`Start AV failed.\n${String(error)}`); + stopPreviewPolling(); + setCurrentMode("idle"); + } + }); +} + +if (stopAvBtn) { + stopAvBtn.addEventListener("click", async () => { + if (currentMode !== "av") { + setAvStatus("AV mode is not running."); + return; + } + + stopAvBtn.disabled = true; + + try { + const result = await invoke("stop_av_recording"); + setAvStatus( + `AV recording stopped.\nAudio: ${result.audio_path}\nVideo: ${result.video_path}` + ); + stopPreviewPolling(); + setCurrentMode("idle"); + } catch (error) { + setAvStatus(`Stop AV failed.\n${String(error)}`); + setCurrentMode("av"); + } + }); +} + +if (rtcConnectBtn) { + rtcConnectBtn.addEventListener("click", async () => { + const serverUrl = (rtcServerUrlInput?.value || "").trim() || "ws://127.0.0.1:3012"; + const displayName = (rtcDisplayNameInput?.value || "").trim() || "anonymous"; + + rtcPollingPaused = true; + + try { + await invoke("rtc_connect_signalling", { + serverUrl, + displayName, + }); + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_connect_signalling failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} + +if (rtcDisconnectBtn) { + rtcDisconnectBtn.addEventListener("click", async () => { + rtcPollingPaused = true; + + try { + await invoke("rtc_disconnect_signalling"); + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_disconnect_signalling failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} + +if (rtcChatSendBtn) { + rtcChatSendBtn.addEventListener("click", async () => { + const text = (rtcChatInput?.value || "").trim(); + if (text.length === 0) { + return; + } + + rtcPollingPaused = true; + + try { + await invoke("rtc_send_chat_message", { text }); + + if (rtcChatInput) { + rtcChatInput.value = ""; + } + + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_send_chat_message failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} + +if (rtcStartOfferBtn) { + rtcStartOfferBtn.addEventListener("click", async () => { + const targetPeerId = (rtcTargetPeerIdInput?.value || "").trim(); + if (targetPeerId.length === 0) { + return; + } + + rtcPollingPaused = true; + + if (rtcStartOfferBtn) { + rtcStartOfferBtn.disabled = true; + } + + try { + await invoke("rtc_start_offer", { targetPeerId }); + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_start_offer failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} + +if (rtcClosePeerBtn) { + rtcClosePeerBtn.addEventListener("click", async () => { + rtcPollingPaused = true; + + try { + await invoke("rtc_close_peer"); + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_close_peer failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} + +if (rtcDirectSendBtn) { + rtcDirectSendBtn.addEventListener("click", async () => { + const text = (rtcDirectMessageInput?.value || "").trim(); + if (text.length === 0) { + return; + } + + rtcPollingPaused = true; + + try { + await invoke("rtc_send_data_message", { text }); + + if (rtcDirectMessageInput) { + rtcDirectMessageInput.value = ""; + } + + await refreshRtcSnapshotForced(); + } catch (error) { + console.error("rtc_send_data_message failed", error); + } finally { + rtcPollingPaused = false; + } + }); +} setCurrentMode("idle"); - -let signallingClient: LocalSignallingClient | null = null; - -function appendChatLog(line: string): void { - if (chatLogs) { - const current = chatLogs.textContent ?? ""; - chatLogs.textContent = `${current}\n${line}`.trim(); - } -} - -function appendChatMessage(line: string): void { - if (chatMessages) { - const current = chatMessages.textContent ?? ""; - chatMessages.textContent = `${current}\n${line}`.trim(); - } -} - -function updatePeerList(peers: PeerInfo[]): void { - if (chatPeers) - chatPeers.textContent = JSON.stringify(peers, null, 2); -} - -function setChatConnected(connected: boolean): void { - if (chatConnectBtn) - chatConnectBtn.disabled = connected; - if (chatDisconnectBtn) - chatDisconnectBtn.disabled = !connected; - if (chatSendBtn) - chatSendBtn.disabled = !connected; -} - -chatConnectBtn.addEventListener("click", async () => { - if (signallingClient !== null) { - appendChatLog("signalling client already exists"); - return; - } - - const displayName = (chatDisplayNameInput.value || "").trim() || "anonymous"; - - const client = new LocalSignallingClient( - displayName, - (line) => appendChatLog(line), - (peers) => updatePeerList(peers), - (line) => appendChatMessage(line), - ); - - try { - await client.connect(); - signallingClient = client; - setChatConnected(true); - } catch (error) { - appendChatLog(`connect failed: ${String(error)}`); - signallingClient = null; - setChatConnected(false); - } -}); - -chatDisconnectBtn.addEventListener("click", () => { - if (signallingClient === null) { - return; - } - - signallingClient.disconnect(); - signallingClient = null; - setChatConnected(false); - updatePeerList([]); -}); - -chatSendBtn.addEventListener("click", () => { - if (signallingClient === null) { - appendChatLog("not connected"); - return; - } - - const text = chatInput.value.trim(); - if (text.length === 0) { - appendChatLog("chat message is empty"); - return; - } - - signallingClient.sendChat(text); - chatInput.value = ""; -}); - -setChatConnected(false); +setRtcConnected(false, false, null); +setRtcDataChannelOpen(false); +startRtcSnapshotPolling(); diff --git a/frontend/styles.css b/frontend/styles.css index 600abb9..abdc71c 100644 --- a/frontend/styles.css +++ b/frontend/styles.css @@ -10,7 +10,7 @@ body { } .container { - max-width: 980px; + max-width: 1100px; margin: 0 auto; padding: 24px; } @@ -43,6 +43,15 @@ button:disabled { cursor: default; } +input[type="text"] { + border: 1px solid #374151; + border-radius: 8px; + padding: 10px 12px; + background: #0b1220; + color: #f9fafb; + min-width: 220px; +} + .preview-wrap { width: 100%; aspect-ratio: 16/9; @@ -51,7 +60,6 @@ button:disabled { border: 1px solid #374151; border-radius: 8px; overflow: hidden; - margin-bottom: 16px; display: flex; align-items: center; justify-content: center; @@ -78,12 +86,3 @@ pre { gap: 16px; margin-bottom: 16px; } - -input[type="text"] { - border: 1px solid #374151; - border-radius: 8px; - padding: 10px 12px; - background: #0b1220; - color: #f9fafb; - min-width: 220px; -} diff --git a/frontend/ts/bindings/RtcSnapshot.ts b/frontend/ts/bindings/RtcSnapshot.ts new file mode 100644 index 0000000..d2cc993 --- /dev/null +++ b/frontend/ts/bindings/RtcSnapshot.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { PeerInfo } from "./PeerInfo"; + +export type RtcSnapshot = { signalling_connected: boolean, server_url: string, display_name: string, my_peer_id: string | null, peers: Array, logs: Array, chat_messages: Array, rtc_messages: Array, rtc_status: string, data_channel_open: boolean, active_remote_peer_id: string | null, }; diff --git a/package.json b/package.json index 44be8c9..e47c1ce 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "tauri-video03", "private": true, - "version": "0.1.1", + "version": "0.2.1", "type": "module", "scripts": { "dev": "vite", diff --git a/src/app_state.rs b/src/app_state.rs index 279a6f4..82438c9 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -61,6 +61,7 @@ pub struct AppState { pub video: std::sync::Mutex, pub preview: PreviewState, pub av: std::sync::Mutex, + pub rtc: crate::rtc_state::RtcAppState, } impl AppState { @@ -70,6 +71,7 @@ impl AppState { video: std::sync::Mutex::new(VideoRecorderState::new()), preview: PreviewState::new(), av: std::sync::Mutex::new(AvRecorderState::new()), + rtc: crate::rtc_state::RtcAppState::new(), } } } diff --git a/src/lib.rs b/src/lib.rs index e8f93d0..e51d433 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,11 @@ mod error; mod media_audio; mod media_av; mod media_video; +mod rtc_commands; +mod rtc_peer; +mod rtc_signalling; +mod rtc_state; +mod rtc_types; fn init_tracing() { let subscriber_result = tracing_subscriber::fmt() @@ -44,6 +49,13 @@ pub fn run() { commands::get_video_preview_frame_base64, commands::start_av_recording, commands::stop_av_recording, + rtc_commands::rtc_connect_signalling, + rtc_commands::rtc_disconnect_signalling, + rtc_commands::rtc_send_chat_message, + rtc_commands::rtc_start_offer, + rtc_commands::rtc_send_data_message, + rtc_commands::rtc_close_peer, + rtc_commands::rtc_get_snapshot, ]); let run_result = builder.run(tauri::generate_context!()); diff --git a/src/rtc_commands.rs b/src/rtc_commands.rs new file mode 100644 index 0000000..455c882 --- /dev/null +++ b/src/rtc_commands.rs @@ -0,0 +1,79 @@ +// file: src/rtc_commands.rs + +use crate::app_state::AppState; +use crate::rtc_peer; +use crate::rtc_signalling; +use crate::rtc_types::RtcSnapshot; + +#[tauri::command] +pub async fn rtc_connect_signalling( + state: tauri::State<'_, AppState>, + server_url: String, + display_name: String, +) -> Result<(), String> { + rtc_signalling::connect_signalling(state, server_url, display_name).await +} + +#[tauri::command] +pub async fn rtc_disconnect_signalling(state: tauri::State<'_, AppState>) -> Result<(), String> { + rtc_signalling::disconnect_signalling(state).await +} + +#[tauri::command] +pub async fn rtc_send_chat_message( + state: tauri::State<'_, AppState>, + text: String, +) -> Result<(), String> { + let signalling_tx = { + let lock_result = state.rtc.runtime.lock(); + match lock_result { + Ok(runtime_guard) => runtime_guard.signalling_tx.clone(), + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let tx = match signalling_tx { + Some(value) => value, + None => { + return Err("signalling is not connected".to_string()); + } + }; + + let send_result = tx.send(crate::rtc_types::RtcAction::WsSend( + crate::rtc_types::ClientWsMessage::ChatSend { text }, + )); + + if let Err(error) = send_result { + return Err(format!("failed to enqueue chat message: {error}")); + } + + Ok(()) +} + +#[tauri::command] +pub async fn rtc_start_offer( + state: tauri::State<'_, AppState>, + target_peer_id: String, +) -> Result<(), String> { + rtc_peer::create_offer_for_target(state.rtc.clone(), target_peer_id).await +} + +#[tauri::command] +pub async fn rtc_send_data_message( + state: tauri::State<'_, AppState>, + text: String, +) -> Result<(), String> { + rtc_peer::send_data_message(state.rtc.clone(), text).await +} + +#[tauri::command] +pub async fn rtc_close_peer(state: tauri::State<'_, AppState>) -> Result<(), String> { + rtc_peer::close_peer(state.rtc.clone()).await +} + +#[tauri::command] +pub async fn rtc_get_snapshot(state: tauri::State<'_, AppState>) -> Result { + state.rtc.snapshot() +} diff --git a/src/rtc_peer.rs b/src/rtc_peer.rs new file mode 100644 index 0000000..01e9317 --- /dev/null +++ b/src/rtc_peer.rs @@ -0,0 +1,524 @@ +// file: src/rtc_peer.rs + +use crate::rtc_state::RtcAppState; +use crate::rtc_types::{ClientWsMessage, RtcAction}; +use std::sync::Arc; +use tokio::sync::mpsc; +use webrtc::api::APIBuilder; +use webrtc::api::interceptor_registry::register_default_interceptors; +use webrtc::api::media_engine::MediaEngine; +use webrtc::data_channel::RTCDataChannel; +use webrtc::data_channel::data_channel_init::RTCDataChannelInit; +use webrtc::data_channel::data_channel_message::DataChannelMessage; +use webrtc::ice_transport::ice_candidate::RTCIceCandidate; +use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; +use webrtc::interceptor::registry::Registry; +use webrtc::peer_connection::RTCPeerConnection; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + +pub async fn new_peer_connection( + rtc_state: RtcAppState, + outbound_tx: mpsc::UnboundedSender, + remote_peer_id: String, +) -> Result, String> { + let mut media_engine = MediaEngine::default(); + let register_codecs_result = media_engine.register_default_codecs(); + if let Err(error) = register_codecs_result { + return Err(format!("register_default_codecs failed: {error}")); + } + + let registry = Registry::new(); + let register_interceptors_result = register_default_interceptors(registry, &mut media_engine); + let registry = match register_interceptors_result { + Ok(value) => value, + Err(error) => { + return Err(format!("register_default_interceptors failed: {error}")); + } + }; + + let api = APIBuilder::new() + .with_media_engine(media_engine) + .with_interceptor_registry(registry) + .build(); + + let config = RTCConfiguration { + ice_servers: vec![], + ..Default::default() + }; + + let new_pc_result = api.new_peer_connection(config).await; + let peer_connection = match new_pc_result { + Ok(value) => Arc::new(value), + Err(error) => { + return Err(format!("new_peer_connection failed: {error}")); + } + }; + + attach_peer_connection_handlers( + rtc_state.clone(), + Arc::clone(&peer_connection), + outbound_tx, + remote_peer_id, + ) + .await?; + + { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(mut runtime_guard) => { + runtime_guard.peer_connection = Some(Arc::clone(&peer_connection)); + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + } + + Ok(peer_connection) +} + +pub async fn create_offer_for_target( + rtc_state: RtcAppState, + target_peer_id: String, +) -> Result<(), String> { + let signalling_tx = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(runtime_guard) => { + let tx_option = runtime_guard.signalling_tx.clone(); + match tx_option { + Some(value) => value, + None => { + return Err("signalling is not connected".to_string()); + } + } + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let peer_connection = new_peer_connection( + rtc_state.clone(), + signalling_tx.clone(), + target_peer_id.clone(), + ) + .await?; + + rtc_state.set_active_remote_peer_id(Some(target_peer_id.clone())); + rtc_state.set_rtc_status("creating offer".to_string()); + + let data_channel_result = peer_connection + .create_data_channel("chat", Some(RTCDataChannelInit::default())) + .await; + + let data_channel = match data_channel_result { + Ok(value) => value, + Err(error) => { + return Err(format!("create_data_channel failed: {error}")); + } + }; + + attach_data_channel_handlers(rtc_state.clone(), Arc::clone(&data_channel)).await?; + + { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(mut runtime_guard) => { + runtime_guard.data_channel = Some(Arc::clone(&data_channel)); + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + } + + let offer_result = peer_connection.create_offer(None).await; + let offer = match offer_result { + Ok(value) => value, + Err(error) => { + return Err(format!("create_offer failed: {error}")); + } + }; + + let set_local_result = peer_connection.set_local_description(offer).await; + if let Err(error) = set_local_result { + return Err(format!("set_local_description(offer) failed: {error}")); + } + + let local_description = peer_connection.local_description().await; + let local_description_value = match local_description { + Some(value) => value, + None => { + return Err("local_description is None after create_offer".to_string()); + } + }; + + let send_result = signalling_tx.send(RtcAction::WsSend(ClientWsMessage::Offer { + target_peer_id, + sdp: local_description_value.sdp, + })); + + if let Err(error) = send_result { + return Err(format!("failed to enqueue offer for signalling: {error}")); + } + + rtc_state.push_log("rtc offer enqueued".to_string()); + rtc_state.set_rtc_status("offer sent".to_string()); + + Ok(()) +} + +pub async fn handle_remote_offer( + rtc_state: RtcAppState, + from_peer_id: String, + sdp: String, +) -> Result<(), String> { + let signalling_tx = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(runtime_guard) => { + let tx_option = runtime_guard.signalling_tx.clone(); + match tx_option { + Some(value) => value, + None => { + return Err("signalling is not connected".to_string()); + } + } + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let peer_connection = new_peer_connection( + rtc_state.clone(), + signalling_tx.clone(), + from_peer_id.clone(), + ) + .await?; + + rtc_state.set_active_remote_peer_id(Some(from_peer_id.clone())); + rtc_state.set_rtc_status("received offer".to_string()); + + let remote_desc_result = RTCSessionDescription::offer(sdp); + let remote_desc = match remote_desc_result { + Ok(value) => value, + Err(error) => { + return Err(format!("RTCSessionDescription::offer failed: {error}")); + } + }; + + let set_remote_result = peer_connection.set_remote_description(remote_desc).await; + if let Err(error) = set_remote_result { + return Err(format!("set_remote_description(offer) failed: {error}")); + } + + let answer_result = peer_connection.create_answer(None).await; + let answer = match answer_result { + Ok(value) => value, + Err(error) => { + return Err(format!("create_answer failed: {error}")); + } + }; + + let set_local_result = peer_connection.set_local_description(answer).await; + if let Err(error) = set_local_result { + return Err(format!("set_local_description(answer) failed: {error}")); + } + + let local_description = peer_connection.local_description().await; + let local_description_value = match local_description { + Some(value) => value, + None => { + return Err("local_description is None after create_answer".to_string()); + } + }; + + let send_result = signalling_tx.send(RtcAction::WsSend(ClientWsMessage::Answer { + target_peer_id: from_peer_id, + sdp: local_description_value.sdp, + })); + + if let Err(error) = send_result { + return Err(format!("failed to enqueue answer for signalling: {error}")); + } + + rtc_state.push_log("rtc answer enqueued".to_string()); + rtc_state.set_rtc_status("answer sent".to_string()); + + Ok(()) +} + +pub async fn handle_remote_answer( + rtc_state: RtcAppState, + from_peer_id: String, + sdp: String, +) -> Result<(), String> { + let peer_connection = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(runtime_guard) => match runtime_guard.peer_connection.clone() { + Some(value) => value, + None => { + return Err(format!("no peer connection for answer from {from_peer_id}")); + } + }, + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let remote_desc_result = RTCSessionDescription::answer(sdp); + let remote_desc = match remote_desc_result { + Ok(value) => value, + Err(error) => { + return Err(format!("RTCSessionDescription::answer failed: {error}")); + } + }; + + let set_remote_result = peer_connection.set_remote_description(remote_desc).await; + if let Err(error) = set_remote_result { + return Err(format!("set_remote_description(answer) failed: {error}")); + } + + rtc_state.push_log(format!("rtc answer applied from {from_peer_id}")); + rtc_state.set_rtc_status("answer applied".to_string()); + + Ok(()) +} + +pub async fn handle_remote_ice_candidate( + rtc_state: RtcAppState, + from_peer_id: String, + candidate: String, + sdp_mid: Option, + sdp_mline_index: Option, +) -> Result<(), String> { + let peer_connection = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(runtime_guard) => match runtime_guard.peer_connection.clone() { + Some(value) => value, + None => { + return Err(format!( + "no peer connection for ice candidate from {from_peer_id}" + )); + } + }, + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let candidate_init = RTCIceCandidateInit { + candidate, + sdp_mid, + sdp_mline_index, + username_fragment: None, + }; + + let add_result = peer_connection.add_ice_candidate(candidate_init).await; + if let Err(error) = add_result { + return Err(format!("add_ice_candidate failed: {error}")); + } + + Ok(()) +} + +pub async fn send_data_message(rtc_state: RtcAppState, text: String) -> Result<(), String> { + let data_channel = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(runtime_guard) => match runtime_guard.data_channel.clone() { + Some(value) => value, + None => { + return Err("data channel is not ready".to_string()); + } + }, + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + let send_result = data_channel.send_text(text.clone()).await; + if let Err(error) = send_result { + return Err(format!("send_text failed: {error}")); + } + + rtc_state.push_rtc_message(format!("me: {text}")); + + Ok(()) +} + +pub async fn close_peer(rtc_state: RtcAppState) -> Result<(), String> { + let (peer_connection, data_channel) = { + let lock_result = rtc_state.runtime.lock(); + match lock_result { + Ok(mut runtime_guard) => { + let pc = runtime_guard.peer_connection.take(); + let dc = runtime_guard.data_channel.take(); + (pc, dc) + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + if let Some(dc) = data_channel { + let close_result = dc.close().await; + if let Err(error) = close_result { + rtc_state.push_log(format!("data channel close failed: {error}")); + } + } + + if let Some(pc) = peer_connection { + let close_result = pc.close().await; + if let Err(error) = close_result { + rtc_state.push_log(format!("peer connection close failed: {error}")); + } + } + + rtc_state.set_data_channel_open(false); + rtc_state.set_active_remote_peer_id(None); + rtc_state.set_rtc_status("closed".to_string()); + + Ok(()) +} + +async fn attach_peer_connection_handlers( + rtc_state: RtcAppState, + peer_connection: Arc, + outbound_tx: mpsc::UnboundedSender, + remote_peer_id: String, +) -> Result<(), String> { + let rtc_state_for_state = rtc_state.clone(); + peer_connection.on_peer_connection_state_change(Box::new( + move |state: RTCPeerConnectionState| { + let rtc_state_inner = rtc_state_for_state.clone(); + Box::pin(async move { + rtc_state_inner.push_log(format!("pc state -> {state:?}")); + rtc_state_inner.set_rtc_status(format!("pc:{state:?}")); + }) + }, + )); + + let rtc_state_for_dc = rtc_state.clone(); + peer_connection.on_data_channel(Box::new(move |dc: Arc| { + let rtc_state_inner = rtc_state_for_dc.clone(); + Box::pin(async move { + let attach_result = + attach_data_channel_handlers(rtc_state_inner.clone(), Arc::clone(&dc)).await; + if let Err(error) = attach_result { + rtc_state_inner.push_log(format!("attach remote data channel failed: {error}")); + return; + } + + let lock_result = rtc_state_inner.runtime.lock(); + if let Ok(mut runtime_guard) = lock_result { + runtime_guard.data_channel = Some(dc); + } + }) + })); + + let rtc_state_for_ice = rtc_state.clone(); + peer_connection.on_ice_candidate(Box::new(move |candidate: Option| { + let outbound_tx_inner = outbound_tx.clone(); + let rtc_state_inner = rtc_state_for_ice.clone(); + let remote_peer_id_inner = remote_peer_id.clone(); + + Box::pin(async move { + let candidate_value = match candidate { + Some(value) => value, + None => { + rtc_state_inner.push_log("ice gathering complete".to_string()); + return; + } + }; + + let to_json_result = candidate_value.to_json(); + let candidate_json = match to_json_result { + Ok(value) => value, + Err(error) => { + rtc_state_inner.push_log(format!("candidate to_json failed: {error}")); + return; + } + }; + + let send_result = + outbound_tx_inner.send(RtcAction::WsSend(ClientWsMessage::IceCandidate { + target_peer_id: remote_peer_id_inner, + candidate: candidate_json.candidate, + sdp_mid: candidate_json.sdp_mid, + sdp_mline_index: candidate_json.sdp_mline_index, + })); + + if let Err(error) = send_result { + rtc_state_inner.push_log(format!("enqueue local candidate failed: {error}")); + } + }) + })); + + Ok(()) +} + +async fn attach_data_channel_handlers( + rtc_state: RtcAppState, + data_channel: Arc, +) -> Result<(), String> { + let rtc_state_for_open = rtc_state.clone(); + data_channel.on_open(Box::new(move || { + let rtc_state_inner = rtc_state_for_open.clone(); + Box::pin(async move { + rtc_state_inner.push_log("data channel open".to_string()); + rtc_state_inner.set_data_channel_open(true); + rtc_state_inner.set_rtc_status("datachannel:open".to_string()); + }) + })); + + let rtc_state_for_close = rtc_state.clone(); + data_channel.on_close(Box::new(move || { + let rtc_state_inner = rtc_state_for_close.clone(); + Box::pin(async move { + rtc_state_inner.push_log("data channel closed".to_string()); + rtc_state_inner.set_data_channel_open(false); + rtc_state_inner.set_rtc_status("datachannel:closed".to_string()); + }) + })); + + let rtc_state_for_error = rtc_state.clone(); + data_channel.on_error(Box::new(move |error| { + let rtc_state_inner = rtc_state_for_error.clone(); + Box::pin(async move { + rtc_state_inner.push_log(format!("data channel error: {error}")); + rtc_state_inner.set_data_channel_open(false); + rtc_state_inner.set_rtc_status("datachannel:error".to_string()); + }) + })); + + let rtc_state_for_message = rtc_state.clone(); + data_channel.on_message(Box::new(move |message: DataChannelMessage| { + let rtc_state_inner = rtc_state_for_message.clone(); + Box::pin(async move { + let text_result = String::from_utf8(message.data.to_vec()); + match text_result { + Ok(text) => { + rtc_state_inner.push_rtc_message(format!("remote: {text}")); + } + Err(error) => { + rtc_state_inner.push_log(format!("invalid rtc utf8 message: {error}")); + } + } + }) + })); + + Ok(()) +} diff --git a/src/rtc_signalling.rs b/src/rtc_signalling.rs new file mode 100644 index 0000000..02f36f7 --- /dev/null +++ b/src/rtc_signalling.rs @@ -0,0 +1,282 @@ +// file: src/rtc_signalling.rs + +use crate::app_state::AppState; +use crate::rtc_peer; +use crate::rtc_state::RtcAppState; +use crate::rtc_types::{ClientWsMessage, RtcAction, ServerWsMessage}; +use futures_util::{SinkExt, StreamExt}; +use tokio::sync::mpsc; +use tokio_tungstenite::tungstenite::Message; + +pub async fn connect_signalling( + app_state: tauri::State<'_, AppState>, + server_url: String, + display_name: String, +) -> Result<(), String> { + { + let lock_result = app_state.rtc.runtime.lock(); + match lock_result { + Ok(runtime_guard) => { + if runtime_guard.signalling_tx.is_some() { + return Err("signalling already connected".to_string()); + } + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + } + + app_state + .rtc + .set_server_info(server_url.clone(), display_name.clone()); + app_state + .rtc + .push_log(format!("connecting to {server_url}")); + + let connect_result = tokio_tungstenite::connect_async(server_url.clone()).await; + let (ws_stream, _) = match connect_result { + Ok(value) => value, + Err(error) => { + return Err(format!("connect_async failed: {error}")); + } + }; + + let (mut ws_write, mut ws_read) = ws_stream.split(); + let (tx, mut rx) = mpsc::unbounded_channel::(); + + { + let lock_result = app_state.rtc.runtime.lock(); + match lock_result { + Ok(mut runtime_guard) => { + runtime_guard.signalling_tx = Some(tx.clone()); + } + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + } + + app_state.rtc.set_signalling_connected(true); + app_state.rtc.push_log("signalling connected".to_string()); + + let hello_send_result = tx.send(RtcAction::WsSend(ClientWsMessage::Hello { + display_name: display_name.clone(), + })); + + if let Err(error) = hello_send_result { + return Err(format!("failed to queue hello: {error}")); + } + + let rtc_state_for_write = app_state.rtc.clone(); + let write_task = tokio::spawn(async move { + while let Some(action) = rx.recv().await { + match action { + RtcAction::Shutdown => { + break; + } + RtcAction::WsSend(message) => { + let json_result = serde_json::to_string(&message); + let json = match json_result { + Ok(value) => value, + Err(error) => { + rtc_state_for_write + .push_log(format!("serialize ws message failed: {error}")); + continue; + } + }; + + let send_result = ws_write.send(Message::Text(json.into())).await; + if let Err(error) = send_result { + rtc_state_for_write.push_log(format!("ws send failed: {error}")); + break; + } + } + } + } + }); + + let rtc_state_for_read = app_state.rtc.clone(); + + tokio::spawn(async move { + while let Some(incoming_result) = ws_read.next().await { + let incoming_message = match incoming_result { + Ok(value) => value, + Err(error) => { + rtc_state_for_read.push_log(format!("ws receive failed: {error}")); + break; + } + }; + + match incoming_message { + Message::Text(text) => { + let parse_result = serde_json::from_str::(&text); + let server_message = match parse_result { + Ok(value) => value, + Err(error) => { + rtc_state_for_read.push_log(format!("invalid server message: {error}")); + continue; + } + }; + + handle_server_message(rtc_state_for_read.clone(), server_message).await; + } + Message::Close(_) => { + rtc_state_for_read.push_log("signalling disconnected".to_string()); + break; + } + Message::Binary(_) => { + rtc_state_for_read.push_log("unexpected binary websocket message".to_string()); + } + Message::Ping(_) => {} + Message::Pong(_) => {} + Message::Frame(_) => {} + } + } + + { + let lock_result = rtc_state_for_read.runtime.lock(); + if let Ok(mut runtime_guard) = lock_result { + runtime_guard.signalling_tx = None; + } + } + + let _ = rtc_peer::close_peer(rtc_state_for_read.clone()).await; + rtc_state_for_read.clear_after_disconnect(); + rtc_state_for_read.push_log("signalling task ended".to_string()); + + write_task.abort(); + }); + + Ok(()) +} + +pub async fn disconnect_signalling(app_state: tauri::State<'_, AppState>) -> Result<(), String> { + let signalling_tx = { + let lock_result = app_state.rtc.runtime.lock(); + match lock_result { + Ok(runtime_guard) => runtime_guard.signalling_tx.clone(), + Err(_) => { + return Err("rtc runtime state lock poisoned".to_string()); + } + } + }; + + if let Some(tx) = signalling_tx { + let send_result = tx.send(RtcAction::Shutdown); + if let Err(error) = send_result { + app_state + .rtc + .push_log(format!("signalling shutdown channel already closed: {error}")); + } + } + + { + let lock_result = app_state.rtc.runtime.lock(); + if let Ok(mut runtime_guard) = lock_result { + runtime_guard.signalling_tx = None; + } + } + + let _ = rtc_peer::close_peer(app_state.rtc.clone()).await; + app_state.rtc.clear_after_disconnect(); + app_state + .rtc + .push_log("signalling disconnected by user".to_string()); + + Ok(()) +} + +async fn handle_server_message(rtc_state: RtcAppState, message: ServerWsMessage) { + match message { + ServerWsMessage::Welcome { peer_id } => { + rtc_state.set_my_peer_id(Some(peer_id.clone())); + rtc_state.push_log(format!("welcome peer_id={peer_id}")); + } + + ServerWsMessage::PeerList { peers } => { + rtc_state.set_peers(peers.clone()); + rtc_state.push_log(format!("peer_list received ({})", peers.len())); + } + + ServerWsMessage::PeerJoined { peer } => { + let snapshot_result = rtc_state.snapshot(); + if let Ok(snapshot) = snapshot_result { + let mut peers = snapshot.peers; + peers.retain(|value| value.peer_id != peer.peer_id); + peers.push(peer.clone()); + peers.sort_by(|a, b| a.display_name.cmp(&b.display_name)); + rtc_state.set_peers(peers); + } + + rtc_state.push_log(format!("peer joined: {}", peer.display_name)); + } + + ServerWsMessage::PeerLeft { peer_id } => { + let snapshot_result = rtc_state.snapshot(); + if let Ok(snapshot) = snapshot_result { + let mut peers = snapshot.peers; + peers.retain(|value| value.peer_id != peer_id); + rtc_state.set_peers(peers); + } + + rtc_state.push_log(format!("peer left: {peer_id}")); + } + + ServerWsMessage::ChatReceive { + from_peer_id: _, + from_display_name, + text, + } => { + rtc_state.push_chat_message(format!("{from_display_name}: {text}")); + } + + ServerWsMessage::Offer { from_peer_id, sdp } => { + rtc_state.push_log(format!("offer received from {from_peer_id}")); + + let result = rtc_peer::handle_remote_offer(rtc_state.clone(), from_peer_id, sdp).await; + if let Err(error) = result { + rtc_state.push_log(format!("handle_remote_offer failed: {error}")); + } + } + + ServerWsMessage::Answer { from_peer_id, sdp } => { + rtc_state.push_log(format!("answer received from {from_peer_id}")); + + let result = rtc_peer::handle_remote_answer(rtc_state.clone(), from_peer_id, sdp).await; + if let Err(error) = result { + rtc_state.push_log(format!("handle_remote_answer failed: {error}")); + } + } + + ServerWsMessage::IceCandidate { + from_peer_id, + candidate, + sdp_mid, + sdp_mline_index, + } => { + let result = rtc_peer::handle_remote_ice_candidate( + rtc_state.clone(), + from_peer_id.clone(), + candidate, + sdp_mid, + sdp_mline_index, + ) + .await; + + if let Err(error) = result { + rtc_state.push_log(format!( + "handle_remote_ice_candidate failed from {from_peer_id}: {error}" + )); + } + } + + ServerWsMessage::Pong => { + rtc_state.push_log("pong".to_string()); + } + + ServerWsMessage::Error { message } => { + rtc_state.push_log(format!("server error: {message}")); + } + } +} diff --git a/src/rtc_state.rs b/src/rtc_state.rs new file mode 100644 index 0000000..a204f5f --- /dev/null +++ b/src/rtc_state.rs @@ -0,0 +1,200 @@ +// file: src/rtc_state.rs + +use crate::rtc_types::{PeerInfo, RtcAction, RtcSnapshot}; +use std::sync::Arc; +use tokio::sync::mpsc; +use webrtc::data_channel::RTCDataChannel; +use webrtc::peer_connection::RTCPeerConnection; + +#[derive(Debug)] +pub struct RtcUiState { + pub signalling_connected: bool, + pub server_url: String, + pub display_name: String, + pub my_peer_id: Option, + pub peers: Vec, + pub logs: Vec, + pub chat_messages: Vec, + pub rtc_messages: Vec, + pub rtc_status: String, + pub data_channel_open: bool, + pub active_remote_peer_id: Option, +} + +impl RtcUiState { + pub fn new() -> Self { + Self { + signalling_connected: false, + server_url: String::new(), + display_name: String::new(), + my_peer_id: None, + peers: Vec::new(), + logs: vec!["Ready.".to_string()], + chat_messages: vec!["No messages yet.".to_string()], + rtc_messages: vec!["No RTC messages yet.".to_string()], + rtc_status: "Idle.".to_string(), + data_channel_open: false, + active_remote_peer_id: None, + } + } + + pub fn snapshot(&self) -> RtcSnapshot { + RtcSnapshot { + signalling_connected: self.signalling_connected, + server_url: self.server_url.clone(), + display_name: self.display_name.clone(), + my_peer_id: self.my_peer_id.clone(), + peers: self.peers.clone(), + logs: self.logs.clone(), + chat_messages: self.chat_messages.clone(), + rtc_messages: self.rtc_messages.clone(), + rtc_status: self.rtc_status.clone(), + data_channel_open: self.data_channel_open, + active_remote_peer_id: self.active_remote_peer_id.clone(), + } + } +} + +pub struct RtcRuntimeState { + pub signalling_tx: Option>, + pub peer_connection: Option>, + pub data_channel: Option>, +} + +impl RtcRuntimeState { + pub fn new() -> Self { + Self { + signalling_tx: None, + peer_connection: None, + data_channel: None, + } + } +} + +#[derive(Clone)] +pub struct RtcAppState { + pub ui: Arc>, + pub runtime: Arc>, +} + +impl RtcAppState { + pub fn new() -> Self { + Self { + ui: Arc::new(std::sync::Mutex::new(RtcUiState::new())), + runtime: Arc::new(std::sync::Mutex::new(RtcRuntimeState::new())), + } + } + + pub fn snapshot(&self) -> Result { + let lock_result = self.ui.lock(); + match lock_result { + Ok(guard) => Ok(guard.snapshot()), + Err(_) => Err("rtc ui state lock poisoned".to_string()), + } + } + + pub fn push_log(&self, line: String) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.logs.push(line); + if guard.logs.len() > 200 { + let drain_len = guard.logs.len().saturating_sub(200); + guard.logs.drain(0..drain_len); + } + } + } + + pub fn push_chat_message(&self, line: String) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + if guard.chat_messages.len() == 1 && guard.chat_messages[0] == "No messages yet." { + guard.chat_messages.clear(); + } + + guard.chat_messages.push(line); + + if guard.chat_messages.len() > 200 { + let drain_len = guard.chat_messages.len().saturating_sub(200); + guard.chat_messages.drain(0..drain_len); + } + } + } + + pub fn push_rtc_message(&self, line: String) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + if guard.rtc_messages.len() == 1 && guard.rtc_messages[0] == "No RTC messages yet." { + guard.rtc_messages.clear(); + } + + guard.rtc_messages.push(line); + + if guard.rtc_messages.len() > 200 { + let drain_len = guard.rtc_messages.len().saturating_sub(200); + guard.rtc_messages.drain(0..drain_len); + } + } + } + + pub fn set_signalling_connected(&self, connected: bool) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.signalling_connected = connected; + } + } + + pub fn set_server_info(&self, server_url: String, display_name: String) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.server_url = server_url; + guard.display_name = display_name; + } + } + + pub fn set_my_peer_id(&self, peer_id: Option) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.my_peer_id = peer_id; + } + } + + pub fn set_peers(&self, peers: Vec) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.peers = peers; + } + } + + pub fn set_rtc_status(&self, status: String) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.rtc_status = status; + } + } + + pub fn set_data_channel_open(&self, open: bool) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.data_channel_open = open; + } + } + + pub fn set_active_remote_peer_id(&self, peer_id: Option) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.active_remote_peer_id = peer_id; + } + } + + pub fn clear_after_disconnect(&self) { + let lock_result = self.ui.lock(); + if let Ok(mut guard) = lock_result { + guard.signalling_connected = false; + guard.my_peer_id = None; + guard.peers.clear(); + guard.rtc_status = "Idle.".to_string(); + guard.data_channel_open = false; + guard.active_remote_peer_id = None; + } + } +} diff --git a/src/rtc_types.rs b/src/rtc_types.rs new file mode 100644 index 0000000..48f5b88 --- /dev/null +++ b/src/rtc_types.rs @@ -0,0 +1,100 @@ +// file: src/rtc_types.rs + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, ts_rs::TS)] +#[ts(export, export_to = "../frontend/ts/bindings/")] +pub struct PeerInfo { + pub peer_id: String, + pub display_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ts_rs::TS)] +#[ts(export, export_to = "../frontend/ts/bindings/")] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ClientWsMessage { + Hello { + display_name: String, + }, + ChatSend { + text: String, + }, + Offer { + target_peer_id: String, + sdp: String, + }, + Answer { + target_peer_id: String, + sdp: String, + }, + IceCandidate { + target_peer_id: String, + candidate: String, + sdp_mid: Option, + sdp_mline_index: Option, + }, + Ping, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ts_rs::TS)] +#[ts(export, export_to = "../frontend/ts/bindings/")] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ServerWsMessage { + Welcome { + peer_id: String, + }, + PeerList { + peers: Vec, + }, + PeerJoined { + peer: PeerInfo, + }, + PeerLeft { + peer_id: String, + }, + ChatReceive { + from_peer_id: String, + from_display_name: String, + text: String, + }, + Offer { + from_peer_id: String, + sdp: String, + }, + Answer { + from_peer_id: String, + sdp: String, + }, + IceCandidate { + from_peer_id: String, + candidate: String, + sdp_mid: Option, + sdp_mline_index: Option, + }, + Pong, + Error { + message: String, + }, +} + +#[derive(Debug, Clone)] +pub enum RtcAction { + WsSend(ClientWsMessage), + Shutdown, +} + +#[derive(Debug, Clone, Serialize, ts_rs::TS)] +#[ts(export, export_to = "../frontend/ts/bindings/")] +pub struct RtcSnapshot { + pub signalling_connected: bool, + pub server_url: String, + pub display_name: String, + pub my_peer_id: Option, + pub peers: Vec, + pub logs: Vec, + pub chat_messages: Vec, + pub rtc_messages: Vec, + pub rtc_status: String, + pub data_channel_open: bool, + pub active_remote_peer_id: Option, +} diff --git a/tauri.conf.json b/tauri.conf.json index f8275ab..e71b41e 100644 --- a/tauri.conf.json +++ b/tauri.conf.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.tauri.app/config/2", "productName": "tauri-video03", - "version": "0.1.1", + "version": "0.2.1", "identifier": "com.sinus.tauri-video03", "build": { "beforeDevCommand": "npm run dev",