8000 feat: realtime heartbeat by loks0n · Pull Request #1011 · appwrite/sdk-generator · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: realtime heartbeat #1011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
private companion object {
private const val TYPE_ERROR = "error"
private const val TYPE_EVENT = "event"
private const val TYPE_PONG = "pong"
private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds

private const val DEBOUNCE_MILLIS = 1L

Expand All @@ -40,6 +42,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
private var reconnectAttempts = 0
private var subscriptionsCounter = 0
private var reconnect = true
private var heartbeatJob: Job? = null
}

private fun createSocket() {
Expand Down Expand Up @@ -80,9 +83,25 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
}

private fun closeSocket() {
stopHeartbeat()
socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
}

private fun startHeartbeat() {
stopHeartbeat()
heartbeatJob = launch {
while (isActive) {
delay(HEARTBEAT_INTERVAL)
socket?.send("""{"type":"ping"}""")
}
}
}

private fun stopHeartbeat() {
heartbeatJob?.cancel()
heartbeatJob = null
}

private fun getTimeout() = when {
reconnectAttempts < 5 -> 1000L
reconnectAttempts < 15 -> 5000L
Expand Down Expand Up @@ -145,6 +164,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
reconnectAttempts = 0
startHeartbeat()
}

override fun onMessage(webSocket: WebSocket, text: String) {
Expand Down Expand Up @@ -181,6 +201,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
stopHeartbeat()
if (!reconnect || code == RealtimeCode.POLICY_VIOLATION.value) {
reconnect = true
return
Expand All @@ -203,6 +224,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
stopHeartbeat()
t.printStackTrace()
}
}
Expand Down
26 changes: 25 additions & 1 deletion templates/flutter/lib/src/realtime_mixin.dart.twig
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,33 @@ mixin RealtimeMixin {
int _retries = 0;
StreamSubscription? _websocketSubscription;
bool _creatingSocket = false;
Timer? _heartbeatTimer;

Future<dynamic> _closeConnection() async {
_stopHeartbeat();
await _websocketSubscription?.cancel();
await _websok?.sink.close(status.normalClosure, 'Ending session');
_lastUrl = null;
_retries = 0;
_reconnect = false;
}

void _startHeartbeat() {
_stopHeartbeat();
_heartbeatTimer = Timer.periodic(Duration(seconds: 20), (_) {
if (_websok != null) {
_websok!.sink.add(jsonEncode({
"type": "ping"
}));
}
});
}

void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
}

_createSocket() async {
if(_creatingSocket || _channels.isEmpty) return;
_creatingSocket = true;
Expand Down Expand Up @@ -78,6 +96,10 @@ mixin RealtimeMixin {
}));
}
}
_startHeartbeat(); // Start heartbeat after successful connection
break;
case 'pong':
debugPrint('Received heartbeat response from realtime server');
break;
case 'event':
final message = RealtimeMessage.fromMap(data.data);
Expand All @@ -91,8 +113,10 @@ mixin RealtimeMixin {
break;
}
}, onDone: () {
_stopHeartbeat();
_retry();
}, onError: (err, stack) {
_stopHeartbeat();
for (var subscription in _subscriptions.values) {
subscription.controller.addError(err, stack);
}
Expand Down Expand Up @@ -187,4 +211,4 @@ mixin RealtimeMixin {
_retry();
}
}
}
}
33 changes: 33 additions & 0 deletions templates/swift/Sources/Services/Realtime.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ open class Realtime : Service {

private let TYPE_ERROR = "error"
private let TYPE_EVENT = "event"
private let TYPE_PONG = "pong"
private let DEBOUNCE_NANOS = 1_000_000
private let HEARTBEAT_INTERVAL: UInt64 = 20_000_000_000 // 20 seconds in nanoseconds

private var socketClient: WebSocketClient? = nil
private var activeChannels = Set<String>()
private var activeSubscriptions = [Int: RealtimeCallback]()
private var heartbeatTask: Task<Void, Swift.Error>? = nil

let connectSync = DispatchQueue(label: "ConnectSync")

Expand All @@ -20,6 +23,29 @@ open class Realtime : Service {
private var subscriptionsCounter = 0
private var reconnect = true

private func startHeartbeat() {
stopHeartbeat()
heartbeatTask = Task {
do {
while !Task.isCancelled {
if let client = socketClient, client.isConnected {
client.send(text: #"{"type": "ping"}"#)
}
try await Task.sleep(nanoseconds: HEARTBEAT_INTERVAL)
}
} catch {
if !Task.isCancelled {
print("Heartbeat task failed: \(error.localizedDescription)")
}
}
}
}

private func stopHeartbeat() {
heartbeatTask?.cancel()
heartbeatTask = nil
}

private func createSocket() async throws {
guard activeChannels.count > 0 else {
reconnect = false
Expand Down Expand Up @@ -50,6 +76,8 @@ open class Realtime : Service {
}

private func closeSocket() async throws {
stopHeartbeat()

guard let client = socketClient,
let group = client.threadGroup else {
return
Expand Down Expand Up @@ -163,6 +191,7 @@ extension Realtime: WebSocketClientDelegate {

public func onOpen(channel: Channel) {
self.reconnectAttempts = 0
startHeartbeat()
}

public func onMessage(text: String) {
Expand All @@ -172,13 +201,16 @@ extension Realtime: WebSocketClientDelegate {
switch type {
case TYPE_ERROR: try! handleResponseError(from: json)
case TYPE_EVENT: handleResponseEvent(from: json)
case TYPE_PONG: break // Handle pong response if needed
default: break
}
}
}
}

public func onClose(channel: Channel, data: Data) async throws {
stopHeartbeat()

if (!reconnect) {
reconnect = true
return
Expand All @@ -196,6 +228,7 @@ extension Realtime: WebSocketClientDelegate {
}

public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
stopHeartbeat()
print(error?.localizedDescription ?? "Unknown error")
}

Expand Down
Loading
0