diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 24ebc318..f2878fb0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -28,6 +28,7 @@ okio = "3.16.0" # https://mvnrepository.com/artifact/com.squareup.okio/okio # Testing Lib versions commons-io = "2.21.0" # https://mvnrepository.com/artifact/commons-io/commons-io commonsCodec = "1.20.0" # https://mvnrepository.com/artifact/commons-codec/commons-codec +coroutinesTest = "1.10.2" # https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-test junit5 = "5.13.4" # https://mvnrepository.com/artifact/org.junit/junit-bom mockWebServer = "5.1.0" # https://mvnrepository.com/artifact/com.squareup.okhttp3/mockwebserver3-junit5 mockitoVersion = "5.18.0" # https://mvnrepository.com/artifact/org.mockito/mockito-core @@ -52,8 +53,8 @@ xz = { module = "org.tukaani:xz", version.ref = "xz" } zstd = { module = "com.github.luben:zstd-jni", version.ref = "zstd" } # Depot Downloader -kotlin-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlin-serialization-json"} -okio = { module = "com.squareup.okio:okio", version.ref = "okio"} +kotlin-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlin-serialization-json" } +okio = { module = "com.squareup.okio:okio", version.ref = "okio" } # Tests test-commons-codec = { module = "commons-codec:commons-codec", version.ref = "commonsCodec" } @@ -61,6 +62,7 @@ test-commons-io = { module = "commons-io:commons-io", version.ref = "commons-io" test-mock-core = { module = "org.mockito:mockito-core", version.ref = "mockitoVersion" } test-mock-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockitoVersion" } test-mock-webserver3 = { module = "com.squareup.okhttp3:mockwebserver3-junit5", version.ref = "mockWebServer" } +tests-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutinesTest" } tests-junit-bom = { module = "org.junit:junit-bom", version.ref = "junit5" } tests-junit-jupiter = { module = "org.junit.jupiter:junit-jupiter" } tests-junit-platform = { module = "org.junit.platform:junit-platform-launcher" } @@ -81,14 +83,15 @@ protobuf-gradle = { id = "com.google.protobuf", version.ref = "protobuf-gradle" [bundles] testing = [ "bouncyCastle", - "zstd", - "xz", "test-commons-codec", "test-commons-io", "test-mock-core", "test-mock-jupiter", "test-mock-webserver3", + "tests-coroutines", "tests-junit-jupiter", + "xz", + "zstd", ] ktor = [ diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/LogOnDetails.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/LogOnDetails.kt index c866e7a5..22ff38b7 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/LogOnDetails.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/LogOnDetails.kt @@ -6,4 +6,4 @@ package `in`.dragonbra.javasteam.steam.handlers.steamgameserver * @param token Gets or sets the authentication token used to log in as a game server. * @param appID Gets or sets the AppID this gameserver will serve. */ -data class LogOnDetails(var token: String?, var appID: Int) +data class LogOnDetails(var token: String, var appID: Int) diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServer.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServer.kt index 40760c45..f9b802bc 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServer.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServer.kt @@ -18,10 +18,13 @@ import `in`.dragonbra.javasteam.steam.handlers.steamgameserver.callback.TicketAu import `in`.dragonbra.javasteam.steam.handlers.steamuser.callback.LoggedOnCallback import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg import `in`.dragonbra.javasteam.steam.steamclient.callbacks.DisconnectedCallback +import `in`.dragonbra.javasteam.types.AsyncJobSingle import `in`.dragonbra.javasteam.types.SteamID import `in`.dragonbra.javasteam.util.HardwareUtils import `in`.dragonbra.javasteam.util.NetHelpers +import `in`.dragonbra.javasteam.util.NetHelpers.obfuscatePrivateIP import `in`.dragonbra.javasteam.util.Utils +import `in`.dragonbra.javasteam.util.obfuscatePrivateIP import java.net.Inet6Address /** @@ -34,26 +37,27 @@ class SteamGameServer : ClientMsgHandler() { * Logs onto the Steam network as a persistent game server. * The client should already have been connected at this point. * Results are return in a [LoggedOnCallback]. - * * @param details The details to use for logging on. + * @return The Job ID of the request. This can be used to fine the appropriate [LoggedOnCallback] + * @throws IllegalArgumentException token is not set within [details]. */ - fun logOn(details: LogOnDetails) { - require(!details.token.isNullOrEmpty()) { "LogOn requires a game server token to be set in 'details'." } + @Throws(IllegalArgumentException::class) + fun logOn(details: LogOnDetails): AsyncJobSingle { + require(!details.token.isBlank()) { "LogOn requires a game server token to be set in 'details'." } + + val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogonGameServer) if (!client.isConnected) { - LoggedOnCallback(EResult.NoConnection).also(client::postCallback) - return + client.postCallback(LoggedOnCallback(EResult.NoConnection, logon.sourceJobID)) + return AsyncJobSingle(client, logon.sourceJobID) } - val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogonGameServer) - val gsId = SteamID(0, 0, client.universe, EAccountType.GameServer) logon.protoHeader.clientSessionid = 0 logon.protoHeader.steamid = gsId.convertToUInt64() - val localIp: CMsgIPAddress = NetHelpers.getMsgIPAddress(client.localIP!!) - logon.body.obfuscatedPrivateIp = NetHelpers.obfuscatePrivateIP(localIp) + logon.body.obfuscatedPrivateIp = NetHelpers.getMsgIPAddress(client.localIP!!).obfuscatePrivateIP() logon.body.protocolVersion = MsgClientLogon.CurrentProtocol @@ -64,6 +68,8 @@ class SteamGameServer : ClientMsgHandler() { logon.body.gameServerToken = details.token client.send(logon) + + return AsyncJobSingle(client, logon.sourceJobID) } /** @@ -72,23 +78,23 @@ class SteamGameServer : ClientMsgHandler() { * Results are return in a [LoggedOnCallback]. * * @param appId The AppID served by this game server, or 0 for the default. + * @return The Job ID of the request. This can be used to fine the appropriate [LoggedOnCallback] */ @JvmOverloads - fun logOnAnonymous(appId: Int = 0) { + fun logOnAnonymous(appId: Int = 0): AsyncJobSingle { + val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogonGameServer) + if (!client.isConnected) { - client.postCallback(LoggedOnCallback(EResult.NoConnection)) - return + client.postCallback(LoggedOnCallback(EResult.NoConnection, logon.sourceJobID)) + return AsyncJobSingle(client, logon.sourceJobID) } - val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogonGameServer) - val gsId = SteamID(0, 0, client.universe, EAccountType.AnonGameServer) logon.protoHeader.clientSessionid = 0 logon.protoHeader.steamid = gsId.convertToUInt64() - val localIp: CMsgIPAddress = NetHelpers.getMsgIPAddress(client.localIP!!) - logon.body.obfuscatedPrivateIp = NetHelpers.obfuscatePrivateIP(localIp) + logon.body.obfuscatedPrivateIp = NetHelpers.getMsgIPAddress(client.localIP!!).obfuscatePrivateIP() logon.body.protocolVersion = MsgClientLogon.CurrentProtocol @@ -97,6 +103,8 @@ class SteamGameServer : ClientMsgHandler() { logon.body.machineId = ByteString.copyFrom(HardwareUtils.getMachineID()) client.send(logon) + + return AsyncJobSingle(client, logon.sourceJobID) } /** diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUser.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUser.kt index 3bdb414d..fbe37795 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUser.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUser.kt @@ -26,6 +26,7 @@ import `in`.dragonbra.javasteam.steam.handlers.steamuser.callback.WalletInfoCall import `in`.dragonbra.javasteam.steam.handlers.steamuser.callback.WebAPIUserNonceCallback import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg import `in`.dragonbra.javasteam.steam.steamclient.callbacks.DisconnectedCallback +import `in`.dragonbra.javasteam.types.AsyncJobSingle import `in`.dragonbra.javasteam.types.SteamID import `in`.dragonbra.javasteam.util.HardwareUtils import `in`.dragonbra.javasteam.util.JavaSteamAddition @@ -55,21 +56,23 @@ class SteamUser : ClientMsgHandler() { * Logs the client into the Steam3 network. * The client should already have been connected at this point. * Results are returned in a [LoggedOnCallback]. - * * @param details The details to use for logging on. + * @return The Job ID of the request. This can be used to find the appropriate [LoggedOnCallback]/ + * @throws IllegalArgumentException Username or password are not set within [details]. */ - fun logOn(details: LogOnDetails) { + @Throws(IllegalArgumentException::class) + fun logOn(details: LogOnDetails): AsyncJobSingle { if (details.username.isEmpty() || (details.password.isNullOrEmpty() && details.accessToken.isNullOrEmpty())) { throw IllegalArgumentException("LogOn requires a username and password or access token to be set in 'details'.") } + val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogon) + if (!client.isConnected) { - client.postCallback(LoggedOnCallback(EResult.NoConnection)) - return + client.postCallback(LoggedOnCallback(EResult.NoConnection, logon.sourceJobID)) + return AsyncJobSingle(client, logon.sourceJobID) } - val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogon) - val steamID = SteamID(details.accountID, details.accountInstance, client.universe, EAccountType.Individual) if (details.loginID != null) { @@ -141,24 +144,26 @@ class SteamUser : ClientMsgHandler() { } client.send(logon) + + return AsyncJobSingle(client, logon.sourceJobID) } /** * Logs the client into the Steam3 network as an anonymous user. * The client should already have been connected at this point. * Results are returned in a [LoggedOnCallback]. - * * @param details The details to use for logging on. + * @return The Job ID of the request. This can be used to find the appropriate [LoggedOnCallback] */ @JvmOverloads - fun logOnAnonymous(details: AnonymousLogOnDetails = AnonymousLogOnDetails()) { + fun logOnAnonymous(details: AnonymousLogOnDetails = AnonymousLogOnDetails()): AsyncJobSingle { + val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogon) + if (!client.isConnected) { - client.postCallback(LoggedOnCallback(EResult.NoConnection)) - return + client.postCallback(LoggedOnCallback(EResult.NoConnection, logon.sourceJobID)) + return AsyncJobSingle(client, logon.sourceJobID) } - val logon = ClientMsgProtobuf(CMsgClientLogon::class.java, EMsg.ClientLogon) - val auId = SteamID(0, 0, client.universe, EAccountType.AnonUser) logon.protoHeader.clientSessionid = 0 @@ -172,6 +177,8 @@ class SteamUser : ClientMsgHandler() { logon.body.machineId = ByteString.copyFrom(HardwareUtils.getMachineID()) client.send(logon) + + return AsyncJobSingle(client, logon.sourceJobID) } /** diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOffCallback.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOffCallback.kt index 25527cf6..5eac8344 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOffCallback.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOffCallback.kt @@ -24,9 +24,11 @@ class LoggedOffCallback(packetMsg: IPacketMsg) : CallbackMsg() { SteammessagesClientserverLogin.CMsgClientLoggedOff::class.java, packetMsg ) + jobID = loggedOff.targetJobID result = EResult.from(loggedOff.body.eresult) } else { val loggedOff = ClientMsg(MsgClientLoggedOff::class.java, packetMsg) + jobID = loggedOff.targetJobID result = loggedOff.body.result } } diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOnCallback.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOnCallback.kt index bfe64be7..9f2bfe19 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOnCallback.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuser/callback/LoggedOnCallback.kt @@ -12,6 +12,7 @@ import `in`.dragonbra.javasteam.protobufs.steamclient.SteammessagesParentalObjec import `in`.dragonbra.javasteam.steam.handlers.steamuser.LogOnDetails import `in`.dragonbra.javasteam.steam.handlers.steamuser.SteamUser import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg +import `in`.dragonbra.javasteam.types.JobID import `in`.dragonbra.javasteam.types.SteamID import `in`.dragonbra.javasteam.util.NetHelpers import `in`.dragonbra.javasteam.util.log.LogManager @@ -161,6 +162,7 @@ class LoggedOnCallback : CallbackMsg { ) val resp = loginResp.body + jobID = loginResp.targetJobID result = EResult.from(resp.eresult) extendedResult = EResult.from(resp.eresultExtended) @@ -203,14 +205,16 @@ class LoggedOnCallback : CallbackMsg { clientInstanceId = resp.clientInstanceId } - constructor(result: EResult) { + constructor(result: EResult, jobID: JobID) { this.result = result + this.jobID = jobID } private fun handleNonProtoLogon(packetMsg: IPacketMsg) { val loginResp = ClientMsg(MsgClientLogOnResponse::class.java, packetMsg) val resp = loginResp.body + jobID = loginResp.targetJobID result = resp.result outOfGameSecsPerHeartbeat = resp.outOfGameHeartbeatRateSec diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/SteamUserStats.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/SteamUserStats.kt index 239904d2..26f77228 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/SteamUserStats.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/SteamUserStats.kt @@ -151,6 +151,43 @@ class SteamUserStats : ClientMsgHandler() { return AsyncJobSingle(this.client, msg.sourceJobID) } + /** + * Asks the Steam back-end for a set of rows for the specified users in the leaderboard. + * Results are returned in a [LeaderboardEntriesCallback]. + * The returned [AsyncJobSingle] can also be awaited to retrieve the callback result. + * @param appId The AppID to request leaderboard rows for. + * @param id ID of the leaderboard to view. + * @param users The IDs of each user to request leaderboard rows for. + * @return The Job ID of the request. This can be used to find the appropriate [LeaderboardEntriesCallback]. + */ + fun downloadLeaderboardEntriesForUsers( + appId: Int, + id: Int, + users: List, + ): AsyncJobSingle { + val msg = ClientMsgProtobuf( + CMsgClientLBSGetLBEntries::class.java, + EMsg.ClientLBSGetLBEntries + ).apply { + sourceJobID = client.getNextJobID() + + // routing_appid has to be set correctly to receive a response + protoHeader.routingAppid = appId + + body.appId = appId + body.leaderboardId = id + body.leaderboardDataRequest = ELeaderboardDataRequest.Users.code() + + users.forEach { steamID -> + body.addSteamids(steamID.convertToUInt64()) + } + } + + client.send(msg) + + return AsyncJobSingle(client, msg.sourceJobID) + } + /** * Gets the Stats-Schema for the specified app. This schema includes Global Achievements and Stats, * @param appId The appID of the game. diff --git a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/callback/LeaderboardEntriesCallback.kt b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/callback/LeaderboardEntriesCallback.kt index 500deb7c..75c82129 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/callback/LeaderboardEntriesCallback.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/handlers/steamuserstats/callback/LeaderboardEntriesCallback.kt @@ -9,7 +9,8 @@ import `in`.dragonbra.javasteam.steam.handlers.steamuserstats.SteamUserStats import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg /** - * This callback is fired in response to [SteamUserStats.getLeaderboardEntries]. + * This callback is fired in response to [SteamUserStats.getLeaderboardEntries] + * and [SteamUserStats.downloadLeaderboardEntriesForUsers]. */ @Suppress("MemberVisibilityCanBePrivate") class LeaderboardEntriesCallback(packetMsg: IPacketMsg) : CallbackMsg() { diff --git a/src/main/java/in/dragonbra/javasteam/steam/steamclient/SteamClient.kt b/src/main/java/in/dragonbra/javasteam/steam/steamclient/SteamClient.kt index 31a2db15..c0fe4075 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/steamclient/SteamClient.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/steamclient/SteamClient.kt @@ -27,6 +27,8 @@ import `in`.dragonbra.javasteam.steam.steamclient.callbacks.DisconnectedCallback import `in`.dragonbra.javasteam.steam.steamclient.configuration.SteamConfiguration import `in`.dragonbra.javasteam.types.AsyncJob import `in`.dragonbra.javasteam.types.JobID +import `in`.dragonbra.javasteam.types.JobID.Companion.toJobID +import `in`.dragonbra.javasteam.util.JavaSteamAddition import `in`.dragonbra.javasteam.util.log.LogManager import `in`.dragonbra.javasteam.util.log.Logger import kotlinx.coroutines.CoroutineScope @@ -37,6 +39,7 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeoutOrNull import java.util.* import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration.Companion.milliseconds /** * Represents a single client that connects to the Steam3 network. @@ -152,7 +155,6 @@ class SteamClient @JvmOverloads constructor( /** * Returns a registered handler. - * * @param type The type of the handler to cast to. Must derive from ClientMsgHandler. * @param T The type of the handler to cast to. Must derive from ClientMsgHandler. * @return A registered handler on success, or null if the handler could not be found. @@ -163,11 +165,32 @@ class SteamClient @JvmOverloads constructor( /** * Kotlin Helper: * Returns a registered handler. - * * @param T The type of the handler to cast to. Must derive from ClientMsgHandler. * @return A registered handler on success, or null if the handler could not be found. */ + @JavaSteamAddition inline fun getHandler(): T? = getHandler(T::class.java) + + /** + * Returns a registered handler, throwing if not found. + * @param type The type of the handler to cast to. Must derive from ClientMsgHandler. + * @return A registered handler. + * @throws IllegalArgumentException No handler of type [T] is registered. + */ + @Throws(IllegalArgumentException::class) + fun getRequiredHandler(type: Class): T = + getHandler(type) ?: throw IllegalArgumentException("No handler found for type ${type.name}") + + /** + * Kotlin Helper: + * Returns a registered handler, throwing if not found. + * @param T The type of the handler to cast to. Must derive from ClientMsgHandler. + * @return A registered handler. + * @throws IllegalArgumentException No handler of type [T] is registered. + */ + @JavaSteamAddition + @Throws(IllegalArgumentException::class) + inline fun getRequiredHandler() = getRequiredHandler(T::class.java) //endregion //region Callbacks @@ -197,11 +220,16 @@ class SteamClient @JvmOverloads constructor( * @param timeout The length of time to block in ms. * @return A callback object from the queue if a callback has been posted, or null if the timeout has elapsed. */ - fun waitForCallback(timeout: Long): CallbackMsg? = runBlocking { - withTimeoutOrNull(timeout) { - callbackQueue.receive() + fun waitForCallback(timeout: Long): CallbackMsg? = + if (timeout <= 0L) { + callbackQueue.tryReceive().getOrNull() + } else { + runBlocking { + withTimeoutOrNull(timeout.milliseconds) { + callbackQueue.receive() + } + } } - } /** * Posts a callback to the queue. This is normally used directly by client message handlers. @@ -279,7 +307,7 @@ class SteamClient @JvmOverloads constructor( jobManager.setTimeoutsEnabled(true) - ConnectedCallback().also(::postCallback) + postCallback(ConnectedCallback()) } /** @@ -303,11 +331,11 @@ class SteamClient @JvmOverloads constructor( } private fun handleJobHeartbeat(packetMsg: IPacketMsg) { - JobID(packetMsg.targetJobID).let(jobManager::heartbeatJob) + jobManager.heartbeatJob(packetMsg.targetJobID.toJobID()) } private fun handleJobFailed(packetMsg: IPacketMsg) { - JobID(packetMsg.targetJobID).let(jobManager::failJob) + jobManager.failJob(packetMsg.targetJobID.toJobID()) } companion object { diff --git a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/Callback.kt b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/Callback.kt index f7654108..6ac12d5d 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/Callback.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/Callback.kt @@ -7,7 +7,7 @@ import java.io.Closeable @Suppress("unused") class Callback @JvmOverloads constructor( override val callbackType: Class, - private val onRun: Consumer?, + val onRun: suspend (TCall) -> Unit, private var mgr: CallbackManager? = null, val jobID: JobID = JobID.INVALID, ) : CallbackBase(), @@ -23,10 +23,10 @@ class Callback @JvmOverloads constructor( } @Suppress("UNCHECKED_CAST") - override fun run(callback: Any) { - val cb = callback as TCall - if ((cb.jobID == jobID || jobID == JobID.INVALID) && onRun != null) { - onRun.accept(cb) + override suspend fun run(callback: Any) { + val cb = callback as? TCall ?: return + if (cb.jobID == jobID || jobID == JobID.INVALID) { + onRun(cb) } } } diff --git a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackBase.kt b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackBase.kt index 39426e21..0e84969a 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackBase.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackBase.kt @@ -5,6 +5,6 @@ package `in`.dragonbra.javasteam.steam.steamclient.callbackmgr * This is for internal use only, and shouldn't be used directly. */ abstract class CallbackBase { - abstract val callbackType: Class<*> - abstract fun run(callback: Any) + internal abstract val callbackType: Class<*> + internal abstract suspend fun run(callback: Any) } diff --git a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManager.kt b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManager.kt index 79635a1e..7fd9edd3 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManager.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManager.kt @@ -8,17 +8,17 @@ import `in`.dragonbra.javasteam.steam.handlers.steamunifiedmessages.callback.Ser import `in`.dragonbra.javasteam.steam.steamclient.SteamClient import `in`.dragonbra.javasteam.types.JobID import `in`.dragonbra.javasteam.util.compat.Consumer +import kotlinx.coroutines.runBlocking import java.io.Closeable import java.util.* import java.util.concurrent.* /** - * This class is a utility for routing callbacks to function calls. + * A utility for routing callbacks to function calls. * In order to bind callbacks to functions, an instance of this class must be created for the * [SteamClient] instance that will be posting callbacks. * - * @constructor Initializes a new instance of the [CallbackManager] class. - * @param steamClient The [SteamClient] instance to handle the callbacks of. + * @param steamClient The [SteamClient] instance to handle the callbacks of. */ class CallbackManager(private val steamClient: SteamClient) { @@ -27,39 +27,40 @@ class CallbackManager(private val steamClient: SteamClient) { private val steamUnifiedMessages: SteamUnifiedMessages = steamClient.getHandler(SteamUnifiedMessages::class.java)!! /** - * Runs a single queued callback. - * If no callback is queued, this method will instantly return. + * Runs a single queued callback. Returns immediately if no callback is queued. * - * @return true if a callback has been run, false otherwise. + * @return `true` if a callback was run, `false` otherwise. */ fun runCallbacks(): Boolean { val call = steamClient.getCallback() ?: return false - handle(call) return true } /** * Blocks the current thread to run a single queued callback. - * If no callback is queued, the method will block for the given timeout or until a callback becomes available. + * If no callback is queued, blocks for up to [timeout] milliseconds or until one becomes available. + * + * If any asynchronous callbacks are registered, they will be blocked on synchronously. + * Use [runWaitCallbackAsync] to properly await asynchronous callbacks. * - * @param timeout The length of time to block. - * @return true if a callback has been run, false otherwise. + * @param timeout The length of time to block in milliseconds. + * @return `true` if a callback was run, `false` if the timeout elapsed. */ - // TODO: Add Kotlin coroutines version. fun runWaitCallbacks(timeout: Long): Boolean { val call = steamClient.waitForCallback(timeout) ?: return false - handle(call) return true } /** - * Blocks the current thread to run all queued callbacks. - * If no callback is queued, the method will block for the given timeout or until a callback becomes available. - * This method returns once the queue has been emptied. + * Blocks the current thread to run all queued callbacks, then returns once the queue is empty. + * If no callback is queued, blocks for up to [timeout] milliseconds or until one becomes available. * - * @param timeout The length of time to block. + * If any asynchronous callbacks are registered, they will be blocked on synchronously. + * Use [runWaitCallbackAsync] to properly await asynchronous callbacks. + * + * @param timeout The length of time to block in milliseconds. */ fun runWaitAllCallbacks(timeout: Long) { if (!runWaitCallbacks(timeout)) { @@ -73,7 +74,10 @@ class CallbackManager(private val steamClient: SteamClient) { /** * Blocks the current thread to run a single queued callback. - * If no callback is queued, the method will block until one becomes available. + * If no callback is queued, blocks indefinitely until one becomes available. + * + * If any asynchronous callbacks are registered, they will be blocked on synchronously. + * Use [runWaitCallbackAsync] to properly await asynchronous callbacks. */ fun runWaitCallbacks() { val call = steamClient.waitForCallback() @@ -81,40 +85,38 @@ class CallbackManager(private val steamClient: SteamClient) { } /** - * + * Asynchronously awaits a single queued callback. + * If no callback is queued, suspends until one becomes available. */ @Suppress("unused") suspend fun runWaitCallbackAsync() { val call = steamClient.waitForCallbackAsync() - handle(call) + handleAsync(call) } /** - * Registers the provided [Consumer] to receive callbacks of type [TCallback] + * Subscribes to callbacks of type [TCallback] with an optional [JobID] filter. * - * @param TCallback The type of callback to subscribe to. - * If this is [JobID.INVALID], all callbacks of type [TCallback] will be received. - * @param callbackType The type of the callback - * @param jobID The [JobID] of the callbacks that should be subscribed to. - * @param callbackFunc The function to invoke with the callback. - * @return An [Closeable]. Disposing of the return value will unsubscribe the callbackFunc . + * @param TCallback The type of callback to subscribe to. + * @param callbackType The class of the callback type. + * @param jobID Only callbacks matching this [JobID] will be received. + * Use [JobID.INVALID] to receive all callbacks of this type. + * @param callbackFunc The function to invoke when a matching callback is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. */ fun subscribe( callbackType: Class, jobID: JobID, callbackFunc: Consumer, - ): Closeable { - val callback = Callback(callbackType, callbackFunc, this, jobID) - return callback - } + ): Closeable = Callback(callbackType, callbackFunc::accept, this, jobID) /** - * Registers the provided [Consumer] to receive callbacks of type [TCallback] + * Subscribes to all callbacks of type [TCallback]. * - * @param TCallback The type of callback to subscribe to. - * @param callbackType type of the callback - * @param callbackFunc The function to invoke with the callback. - * @return An [Closeable]. Disposing of the return value will unsubscribe the callbackFunc. + * @param TCallback The type of callback to subscribe to. + * @param callbackType The class of the callback type. + * @param callbackFunc The function to invoke when a matching callback is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. */ fun subscribe( callbackType: Class, @@ -122,32 +124,47 @@ class CallbackManager(private val steamClient: SteamClient) { ): Closeable = subscribe(callbackType, JobID.INVALID, callbackFunc) /** - * Registers a callback to receive service notifications from Steam's unified messaging system. + * Subscribes to callbacks of type [TCallback] with a suspending callback and an optional [JobID] filter. * - * This method creates a service subscription that listens for specific notifications from a Steam service. - * When a notification arrives, it validates the notification type and forwards it to the provided callback - * function if the types match. + * Use [runWaitCallbackAsync] to properly await the suspending [callbackFunc]. * - * @param TService The type of Steam service to subscribe to (e.g., GameNotificationsClient) - * @param TNotification The type of notification message to receive (e.g., CGameNotifications_OnNotificationsRequested_Notification.Builder) - * @param serviceClass The class object representing the Steam service - * @param notificationClass The class object representing the notification type - * @param callbackFunc The callback function to be invoked when matching notifications are received - * @return A [Closeable] subscription. Call [Closeable.close] to unsubscribe and clean up resources + * @param TCallback The type of callback to subscribe to. + * @param callbackType The class of the callback type. + * @param jobID Only callbacks matching this [JobID] will be received. + * Use [JobID.INVALID] to receive all callbacks of this type. + * @param callbackFunc The suspending function to invoke when a matching callback is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. + */ + fun subscribe( + callbackType: Class, + jobID: JobID, + callbackFunc: suspend (TCallback) -> Unit, + ): Closeable = Callback(callbackType, callbackFunc, this, jobID) + + /** + * Subscribes to all callbacks of type [TCallback] with a suspending callback. * - * Example usage in Kotlin: - * val subscription = manager.subscribeServiceNotification( - * GameNotificationsClient::class.java, - * CGameNotifications_OnNotificationsRequested_Notification.Builder::class.java, - * this::onGameStartedNotification - * ) + * Use [runWaitCallbackAsync] to properly await the suspending [callbackFunc]. * - * Example usage in Java: - * manager.subscribeServiceNotification( - * GameNotificationsClient.class, - * CGameNotifications_OnNotificationsRequested_Notification.Builder.class, - * notification -> onGameStartedNotification(notification) - * ); + * @param TCallback The type of callback to subscribe to. + * @param callbackType The class of the callback type. + * @param callbackFunc The suspending function to invoke when a matching callback is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. + */ + fun subscribe( + callbackType: Class, + callbackFunc: suspend (TCallback) -> Unit, + ): Closeable = subscribe(callbackType, JobID.INVALID, callbackFunc) + + /** + * Subscribes to service notifications of type [TNotification] from the [TService] Steam unified service. + * + * @param TService The unified service type to subscribe to. + * @param TNotification The notification message type to receive. + * @param serviceClass The class of the unified service. + * @param notificationClass The class of the notification type. + * @param callbackFunc The function to invoke when a matching notification is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. */ @Suppress("UNCHECKED_CAST") fun > subscribeServiceNotification( @@ -165,50 +182,23 @@ class CallbackManager(private val steamClient: SteamClient) { } } - val callback = Callback( + return Callback( callbackType = ServiceMethodNotification::class.java as Class>, - onRun = wrappedCallback, + onRun = wrappedCallback::accept, mgr = this, jobID = JobID.INVALID ) - - return callback } /** - * Registers a callback to receive service responses from Steam's unified messaging system. - * - * This method creates a service subscription that listens for specific responses from a Steam service. - * When a response is received, it validates the response type and forwards it to the provided callback - * function if the types match. Unlike notifications, responses are typically used for request-response - * patterns in the Steam API. + * Subscribes to service responses of type [TNotification] from the [TService] Steam unified service. * - * @param TService The type of Steam service to subscribe to (e.g., GameNotificationsClient) - * @param TNotification The type of response message to receive - * @param serviceClass The class object representing the Steam service - * @param notificationClass The class object representing the response type - * @param callbackFunc The callback function to be invoked when matching responses are received - * @return A [Closeable] subscription. Call [Closeable.close] to unsubscribe and clean up resources - * - * @see ServiceMethodResponse - * @see UnifiedService - * - * Example usage in Kotlin: - * val subscription = manager.subscribeServiceResponse( - * Player::class.java, - * CPlayer_GetGameBadgeLevels_Response.Builder::class.java - * ) { response -> - * println("Badge level: ${response.body.playerLevel}") - * } - * - * Example usage in Java: - * Closeable subscription = manager.subscribeServiceResponse( - * Player.class, - * CPlayer_GetGameBadgeLevels_Response.Builder.class, - * response -> { - * System.out.println("Badge level: " + response.getBody().getPlayerLevel()); - * } - * ); + * @param TService The unified service type to subscribe to. + * @param TNotification The response message type to receive. + * @param serviceClass The class of the unified service. + * @param notificationClass The class of the response type. + * @param callbackFunc The function to invoke when a matching response is received. + * @return A [Closeable] that unsubscribes [callbackFunc] when closed. */ @Suppress("UNCHECKED_CAST") fun > subscribeServiceResponse( @@ -226,21 +216,26 @@ class CallbackManager(private val steamClient: SteamClient) { } } - val callback = Callback( + return Callback( callbackType = ServiceMethodResponse::class.java as Class>, - onRun = wrappedCallback, + onRun = wrappedCallback::accept, mgr = this, jobID = JobID.INVALID, ) - - return callback } //region Kotlin-Helpers + + // @JvmSynthetic + // inline fun subscribe( + // jobID: JobID = JobID.INVALID, + // noinline callbackFunc: (TCallback) -> Unit, + // ): Closeable = subscribe(TCallback::class.java, jobID, callbackFunc) + @JvmSynthetic inline fun subscribe( jobID: JobID = JobID.INVALID, - noinline callbackFunc: (TCallback) -> Unit, + noinline callbackFunc: suspend (TCallback) -> Unit, ): Closeable = subscribe(TCallback::class.java, jobID, callbackFunc) @JvmSynthetic @@ -276,8 +271,16 @@ class CallbackManager(private val steamClient: SteamClient) { private fun handle(call: CallbackMsg) { val callbacks = registeredCallbacks val type = call.javaClass + callbacks.forEach { callback -> + if (callback.callbackType.isAssignableFrom(type)) { + runBlocking { callback.run(call) } + } + } + } - // find handlers interested in this callback + private suspend fun handleAsync(call: CallbackMsg) { + val callbacks = registeredCallbacks + val type = call.javaClass callbacks.forEach { callback -> if (callback.callbackType.isAssignableFrom(type)) { callback.run(call) diff --git a/src/main/java/in/dragonbra/javasteam/types/AsyncJobMultiple.kt b/src/main/java/in/dragonbra/javasteam/types/AsyncJobMultiple.kt index 36fb7d95..f457fb40 100644 --- a/src/main/java/in/dragonbra/javasteam/types/AsyncJobMultiple.kt +++ b/src/main/java/in/dragonbra/javasteam/types/AsyncJobMultiple.kt @@ -4,7 +4,8 @@ import `in`.dragonbra.javasteam.steam.steamclient.AsyncJobFailedException import `in`.dragonbra.javasteam.steam.steamclient.SteamClient import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.future.await +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.future.asCompletableFuture import java.util.* import java.util.concurrent.CompletableFuture @@ -24,20 +25,19 @@ class AsyncJobMultiple( var results: List = listOf(), ) - private val future = CompletableFuture>() + private val deferred = CompletableDeferred>() - private val results = mutableListOf() + private val results = Collections.synchronizedList(mutableListOf()) init { registerJob(client) } - @Deprecated("Use toFuture() instead", ReplaceWith("toFuture()")) - fun toDeferred(): CompletableFuture> = toFuture() + // Kotlin + suspend fun await(): ResultSet = deferred.await() - fun toFuture(): CompletableFuture> = future - - suspend fun await(): ResultSet = future.await() + // Java interop + fun toFuture(): CompletableFuture> = deferred.asCompletableFuture() @Suppress("unused") @Throws(CancellationException::class) @@ -51,7 +51,12 @@ class AsyncJobMultiple( results.add(callbackMsg) return if (finishCondition(callbackMsg) == true) { - future.complete(ResultSet(complete = true, failed = false, results = Collections.unmodifiableList(results))) + val result = ResultSet( + complete = true, + failed = false, + results = Collections.unmodifiableList(results) + ) + deferred.complete(result) true } else { heartbeat() @@ -64,14 +69,18 @@ class AsyncJobMultiple( // if we have zero callbacks in our result set, we cancel this task if (dueToRemoteFailure) { // if we're canceling with a remote failure, post a job failure exception - future.completeExceptionally(AsyncJobFailedException()) + deferred.completeExceptionally(AsyncJobFailedException()) } else { // otherwise, normal task cancellation for timeouts - future.cancel(true) + deferred.cancel() } } else { - val resultSet = ResultSet(false, dueToRemoteFailure, Collections.unmodifiableList(results)) - future.complete(resultSet) + val result = ResultSet( + complete = false, + failed = dueToRemoteFailure, + results = Collections.unmodifiableList(results) + ) + deferred.complete(result) } } } diff --git a/src/main/java/in/dragonbra/javasteam/types/AsyncJobSingle.kt b/src/main/java/in/dragonbra/javasteam/types/AsyncJobSingle.kt index dcb8bdd0..2bea7c78 100644 --- a/src/main/java/in/dragonbra/javasteam/types/AsyncJobSingle.kt +++ b/src/main/java/in/dragonbra/javasteam/types/AsyncJobSingle.kt @@ -3,9 +3,10 @@ package `in`.dragonbra.javasteam.types import `in`.dragonbra.javasteam.steam.steamclient.AsyncJobFailedException import `in`.dragonbra.javasteam.steam.steamclient.SteamClient import `in`.dragonbra.javasteam.steam.steamclient.callbackmgr.CallbackMsg -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.future.await -import java.util.concurrent.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.future.asCompletableFuture +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutionException /** * @author Lossy @@ -13,27 +14,26 @@ import java.util.concurrent.* */ class AsyncJobSingle(client: SteamClient, jobId: JobID) : AsyncJob(client, jobId) { - private val future = CompletableFuture() + private val deferred = CompletableDeferred() init { registerJob(client) } - @Deprecated("Use toFuture() instead", ReplaceWith("toFuture()")) - fun toDeferred(): CompletableFuture = toFuture() + // Kotlin + suspend fun await(): T = deferred.await() - fun toFuture(): CompletableFuture = future - - suspend fun await(): T = future.await() + // Java interop + fun toFuture(): CompletableFuture = deferred.asCompletableFuture() @Suppress("unused") - @Throws(CancellationException::class) + @Throws(ExecutionException::class, InterruptedException::class) fun runBlock(): T = toFuture().get() override fun addResult(callback: CallbackMsg): Boolean { // we're complete with just this callback @Suppress("UNCHECKED_CAST") - future.complete(callback as T) + deferred.complete(callback as T) // inform steamclient that this job wishes to be removed from tracking since // we've received the single callback we were waiting for @@ -43,10 +43,10 @@ class AsyncJobSingle(client: SteamClient, jobId: JobID) : Async override fun setFailed(dueToRemoteFailure: Boolean) { if (dueToRemoteFailure) { // if steam informs us of a remote failure, we complete with exception - future.completeExceptionally(AsyncJobFailedException()) + deferred.completeExceptionally(AsyncJobFailedException()) } else { // if we time out, we cancel the future - future.cancel(true) + deferred.cancel() } } } diff --git a/src/main/java/in/dragonbra/javasteam/types/JobID.kt b/src/main/java/in/dragonbra/javasteam/types/JobID.kt index a9ae7856..13b186cd 100644 --- a/src/main/java/in/dragonbra/javasteam/types/JobID.kt +++ b/src/main/java/in/dragonbra/javasteam/types/JobID.kt @@ -11,6 +11,9 @@ class JobID : GlobalID { */ @JvmField val INVALID: JobID = JobID() + + @JvmSynthetic + fun Long.toJobID() = JobID(this) } /** diff --git a/src/main/java/in/dragonbra/javasteam/util/NetHelpers.kt b/src/main/java/in/dragonbra/javasteam/util/NetHelpers.kt index 21a4bc99..caa8894b 100644 --- a/src/main/java/in/dragonbra/javasteam/util/NetHelpers.kt +++ b/src/main/java/in/dragonbra/javasteam/util/NetHelpers.kt @@ -12,6 +12,8 @@ import java.net.Socket import java.net.UnknownHostException import java.nio.ByteBuffer +fun CMsgIPAddress.obfuscatePrivateIP() = NetHelpers.obfuscatePrivateIP(this) + /** * @author lngtr * @since 2018-02-22 diff --git a/src/test/java/in/dragonbra/javasteam/steam/handlers/HandlerTestBase.java b/src/test/java/in/dragonbra/javasteam/steam/handlers/HandlerTestBase.java index ff1eeccb..5b282009 100644 --- a/src/test/java/in/dragonbra/javasteam/steam/handlers/HandlerTestBase.java +++ b/src/test/java/in/dragonbra/javasteam/steam/handlers/HandlerTestBase.java @@ -70,6 +70,12 @@ protected C verifyCallback() { return (C) callback; } + protected CallbackMsg getCallback() { + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(CallbackMsg.class); + verify(steamClient, atLeast(1)).postCallback(callbackCaptor.capture()); + return callbackCaptor.getValue(); + } + protected IPacketMsg getPacket(EMsg msgType, boolean isProto) { return CMClient.getPacketMsg(TestPackets.getPacket(msgType, isProto)); } diff --git a/src/test/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServerTest.java b/src/test/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServerTest.java new file mode 100644 index 00000000..fce82c78 --- /dev/null +++ b/src/test/java/in/dragonbra/javasteam/steam/handlers/steamgameserver/SteamGameServerTest.java @@ -0,0 +1,57 @@ +package in.dragonbra.javasteam.steam.handlers.steamgameserver; + +import in.dragonbra.javasteam.enums.EResult; +import in.dragonbra.javasteam.steam.handlers.HandlerTestBase; +import in.dragonbra.javasteam.steam.handlers.steamuser.callback.LoggedOnCallback; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** + * @author Lossy + * @since 2026-3-22 + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class SteamGameServerTest extends HandlerTestBase { + + @Override + protected SteamGameServer createHandler() { + return new SteamGameServer(); + } + + @Test + public void logOnPostsLoggedOnCallbackWhenNoConnection() { + Mockito.when(steamClient.isConnected()).thenReturn(false); + + var details = new LogOnDetails("SuperSecretToken", 0); + var asyncJob = handler.logOn(details); + + var callback = getCallback(); + Assertions.assertNotNull(callback); + Assertions.assertEquals(LoggedOnCallback.class, callback.getClass()); + + var loc = (LoggedOnCallback) callback; + Assertions.assertEquals(EResult.NoConnection, loc.getResult()); + Assertions.assertEquals(asyncJob.getJobID(), loc.getJobID()); + } + + @Test + public void logOnAnonymousPostsLoggedOnCallbackWhenNoConnection() { + Mockito.when(steamClient.isConnected()).thenReturn(false); + + var asyncJob = handler.logOnAnonymous(); + + var callback = getCallback(); + Assertions.assertNotNull(callback); + Assertions.assertEquals(LoggedOnCallback.class, callback.getClass()); + + var loc = (LoggedOnCallback) callback; + Assertions.assertEquals(EResult.NoConnection, loc.getResult()); + Assertions.assertEquals(asyncJob.getJobID(), loc.getJobID()); + } +} diff --git a/src/test/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUserTest.java b/src/test/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUserTest.java index 11937d94..9e9a5f65 100644 --- a/src/test/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUserTest.java +++ b/src/test/java/in/dragonbra/javasteam/steam/handlers/steamuser/SteamUserTest.java @@ -15,15 +15,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import java.util.Date; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - /** * @author lngtr * @since 2018-03-24 @@ -37,6 +35,82 @@ protected SteamUser createHandler() { return new SteamUser(); } + @Test + public void logOnPostsLoggedOnCallbackWhenNoConnection() { + Mockito.when(steamClient.isConnected()).thenReturn(false); + + var details = new LogOnDetails(); + details.setUsername("iamauser"); + details.setPassword("lamepassword"); + var asyncJob = handler.logOn(details); + + var callback = getCallback(); + Assertions.assertNotNull(callback); + Assertions.assertEquals(LoggedOnCallback.class, callback.getClass()); + + var loc = (LoggedOnCallback) callback; + Assertions.assertEquals(EResult.NoConnection, loc.getResult()); + Assertions.assertEquals(asyncJob.getJobID(), loc.getJobID()); + } + + @Test + public void logOnThrowsExceptionIfDetailsNotProvided() { + Assertions.assertThrows(IllegalArgumentException.class, () -> handler.logOn(new LogOnDetails())); + } + + @Test + public void logOnThrowsExceptionIfUsernameNotProvided_OnlyPassword() { + var details = new LogOnDetails(); + details.setPassword("def"); + Assertions.assertThrows(IllegalArgumentException.class, () -> handler.logOn(details)); + } + + @Test + public void logOnThrowsExceptionIfUsernameNotProvided_OnlyAccessToken() { + var details = new LogOnDetails(); + details.setAccessToken("def"); + Assertions.assertThrows(IllegalArgumentException.class, () -> handler.logOn(details)); + } + + @Test + public void logOnThrowsExceptionIfPasswordAndAccessTokenNotProvided() { + var details = new LogOnDetails(); + details.setUsername("abc"); + Assertions.assertThrows(IllegalArgumentException.class, () -> handler.logOn(details)); + } + + @Test + public void logOnDoesNotThrowExceptionIfUserNameAndPasswordProvided() { + var details = new LogOnDetails(); + details.setUsername("abc"); + details.setPassword("def"); + Assertions.assertDoesNotThrow(() -> handler.logOn(details)); + } + + @Test + public void logOnDoesNotThrowExceptionIfUserNameAndAccessTokenProvided() { + var details = new LogOnDetails(); + details.setUsername("abc"); + details.setPassword("def"); + details.setShouldRememberPassword(true); + Assertions.assertDoesNotThrow(() -> handler.logOn(details)); + } + + @Test + public void logOnAnonymousPostsLoggedOnCallbackWhenNoConnection() { + Mockito.when(steamClient.isConnected()).thenReturn(false); + + var asyncJob = handler.logOnAnonymous(); + + var callback = getCallback(); + Assertions.assertNotNull(callback); + Assertions.assertEquals(LoggedOnCallback.class, callback.getClass()); + + var loc = (LoggedOnCallback) callback; + Assertions.assertEquals(EResult.NoConnection, loc.getResult()); + Assertions.assertEquals(asyncJob.getJobID(), loc.getJobID()); + } + @Test public void logOn() { LogOnDetails details = new LogOnDetails(); @@ -47,14 +121,14 @@ public void logOn() { ClientMsgProtobuf msg = verifySend(EMsg.ClientLogon); - assertEquals("testusername", msg.getBody().getAccountName()); - assertEquals("testpassword", msg.getBody().getPassword()); + Assertions.assertEquals("testusername", msg.getBody().getAccountName()); + Assertions.assertEquals("testpassword", msg.getBody().getPassword()); } @Test public void logOnNotConnected() { - reset(steamClient); - when(steamClient.isConnected()).thenReturn(false); + Mockito.reset(steamClient); + Mockito.when(steamClient.isConnected()).thenReturn(false); LogOnDetails details = new LogOnDetails(); details.setUsername("testusername"); @@ -64,12 +138,12 @@ public void logOnNotConnected() { LoggedOnCallback callback = verifyCallback(); - assertEquals(EResult.NoConnection, callback.getResult()); + Assertions.assertEquals(EResult.NoConnection, callback.getResult()); } @Test public void logOnNoDetails() { - assertThrows(IllegalArgumentException.class, () -> { + Assertions.assertThrows(IllegalArgumentException.class, () -> { LogOnDetails details = new LogOnDetails(); handler.logOn(details); }); @@ -80,35 +154,35 @@ public void logOnAnonymous() { handler.logOnAnonymous(); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(IClientMsg.class); - verify(steamClient).send(msgCaptor.capture()); + Mockito.verify(steamClient).send(msgCaptor.capture()); ClientMsgProtobuf msg = verifySend(EMsg.ClientLogon); SteamID id = new SteamID(msg.getProtoHeader().getSteamid()); - assertEquals(EAccountType.AnonUser, id.getAccountType()); + Assertions.assertEquals(EAccountType.AnonUser, id.getAccountType()); } @Test public void logOnAnonymousNotConnected() { - reset(steamClient); - when(steamClient.isConnected()).thenReturn(false); + Mockito.reset(steamClient); + Mockito.when(steamClient.isConnected()).thenReturn(false); handler.logOnAnonymous(); LoggedOnCallback callback = verifyCallback(); - assertEquals(EResult.NoConnection, callback.getResult()); + Assertions.assertEquals(EResult.NoConnection, callback.getResult()); } @Test public void logOff() { handler.logOff(); - verify(steamClient).setExpectDisconnection(true); + Mockito.verify(steamClient).setExpectDisconnection(true); ClientMsgProtobuf msg = verifySend(EMsg.ClientLogOff); - assertNotNull(msg); + Assertions.assertNotNull(msg); } @Test @@ -119,7 +193,7 @@ public void handleLogonResponse() { LoggedOnCallback callback = verifyCallback(); - assertEquals(EResult.OK, callback.getResult()); + Assertions.assertEquals(EResult.OK, callback.getResult()); } @Test @@ -130,7 +204,7 @@ public void handleLogonResponseNonProto() { LoggedOnCallback callback = verifyCallback(); - assertEquals(EResult.OK, callback.getResult()); + Assertions.assertEquals(EResult.OK, callback.getResult()); } @Test @@ -141,7 +215,7 @@ public void handleLogOffResponse() { LoggedOffCallback callback = verifyCallback(); - assertEquals(EResult.OK, callback.getResult()); + Assertions.assertEquals(EResult.OK, callback.getResult()); } @Test @@ -152,7 +226,7 @@ public void handleLogOffResponseNonProto() { LoggedOffCallback callback = verifyCallback(); - assertEquals(EResult.OK, callback.getResult()); + Assertions.assertEquals(EResult.OK, callback.getResult()); } @Test @@ -163,7 +237,7 @@ public void handleSessionToken() { SessionTokenCallback callback = verifyCallback(); - assertEquals(123, callback.getSessionToken()); + Assertions.assertEquals(123, callback.getSessionToken()); } @Test @@ -174,8 +248,8 @@ public void handleAccountInfo() { AccountInfoCallback callback = verifyCallback(); - assertEquals("XX", callback.getCountry()); - assertEquals("testpersonaname", callback.getPersonaName()); + Assertions.assertEquals("XX", callback.getCountry()); + Assertions.assertEquals("testpersonaname", callback.getPersonaName()); } @Test @@ -186,10 +260,10 @@ public void handleWalletInfo() { WalletInfoCallback callback = verifyCallback(); - assertFalse(callback.isHasWallet()); - assertEquals(0, callback.getBalance()); - assertEquals(ECurrencyCode.Invalid, callback.getCurrency()); - assertEquals(0L, callback.getLongBalance()); + Assertions.assertFalse(callback.isHasWallet()); + Assertions.assertEquals(0, callback.getBalance()); + Assertions.assertEquals(ECurrencyCode.Invalid, callback.getCurrency()); + Assertions.assertEquals(0L, callback.getLongBalance()); } @Test @@ -200,8 +274,8 @@ public void handleWebAPIUserNonce() { WebAPIUserNonceCallback callback = verifyCallback(); - assertEquals(EResult.OK, callback.getResult()); - assertEquals("testnonce", callback.getNonce()); + Assertions.assertEquals(EResult.OK, callback.getResult()); + Assertions.assertEquals("testnonce", callback.getNonce()); } @Test @@ -212,8 +286,8 @@ public void handleMarketingMessageUpdate() { MarketingMessageCallback callback = verifyCallback(); - assertEquals(new Date(1521763200000L), callback.getUpdateTime()); - assertEquals(7, callback.getMessages().size()); + Assertions.assertEquals(new Date(1521763200000L), callback.getUpdateTime()); + Assertions.assertEquals(7, callback.getMessages().size()); } @Test diff --git a/src/test/java/in/dragonbra/javasteam/steam/steamclient/SteamClientTest.java b/src/test/java/in/dragonbra/javasteam/steam/steamclient/SteamClientTest.java index 59bac2f1..f6050409 100644 --- a/src/test/java/in/dragonbra/javasteam/steam/steamclient/SteamClientTest.java +++ b/src/test/java/in/dragonbra/javasteam/steam/steamclient/SteamClientTest.java @@ -3,7 +3,9 @@ import in.dragonbra.javasteam.base.IPacketMsg; import in.dragonbra.javasteam.steam.handlers.ClientMsgHandler; import in.dragonbra.javasteam.steam.handlers.steamapps.SteamApps; +import in.dragonbra.javasteam.steam.handlers.steamauthticket.SteamAuthTicket; import in.dragonbra.javasteam.steam.handlers.steamcloud.SteamCloud; +import in.dragonbra.javasteam.steam.handlers.steamcontent.SteamContent; import in.dragonbra.javasteam.steam.handlers.steamfriends.SteamFriends; import in.dragonbra.javasteam.steam.handlers.steamgamecoordinator.SteamGameCoordinator; import in.dragonbra.javasteam.steam.handlers.steamgameserver.SteamGameServer; @@ -55,11 +57,12 @@ public void handlersCountCheck() { @Test public void constructorSetsInitialHandlers() { - Assertions.assertNotNull(client.getHandler(SteamFriends.class)); Assertions.assertNotNull(client.getHandler(SteamUser.class)); + Assertions.assertNotNull(client.getHandler(SteamFriends.class)); Assertions.assertNotNull(client.getHandler(SteamApps.class)); Assertions.assertNotNull(client.getHandler(SteamGameCoordinator.class)); Assertions.assertNotNull(client.getHandler(SteamGameServer.class)); + Assertions.assertNotNull(client.getHandler(SteamUserStats.class)); Assertions.assertNotNull(client.getHandler(SteamMasterServer.class)); Assertions.assertNotNull(client.getHandler(SteamCloud.class)); Assertions.assertNotNull(client.getHandler(SteamWorkshop.class)); @@ -67,8 +70,9 @@ public void constructorSetsInitialHandlers() { Assertions.assertNotNull(client.getHandler(SteamScreenshots.class)); Assertions.assertNotNull(client.getHandler(SteamMatchmaking.class)); Assertions.assertNotNull(client.getHandler(SteamNetworking.class)); + Assertions.assertNotNull(client.getHandler(SteamContent.class)); + Assertions.assertNotNull(client.getHandler(SteamAuthTicket.class)); Assertions.assertNotNull(client.getHandler(SteamNotifications.class)); - Assertions.assertNotNull(client.getHandler(SteamUserStats.class)); } @Test @@ -99,6 +103,39 @@ public void removeHandlerRemovesHandlerByInstance() { Assertions.assertNull(client.getHandler(TestMsgHandler.class)); } + @Test + public void addHandlerThrowsOnDuplicateHandler() { + var steamClient = new SteamClient(); + steamClient.addHandler(new TestMsgHandler()); + + Assertions.assertThrows(IllegalArgumentException.class, () -> steamClient.addHandler(new TestMsgHandler())); + } + + @Test + public void removeHandlerByTypeDoesNothingWhenNotRegistered() { + var steamClient = new SteamClient(); + Assertions.assertNull(client.getHandler(TestMsgHandler.class)); + + steamClient.removeHandler(TestMsgHandler.class); + Assertions.assertNull(client.getHandler(TestMsgHandler.class)); + } + + @Test + public void getRequiredHandlerReturnsHandler() { + var steamClient = new SteamClient(); + var handler = steamClient.getRequiredHandler(SteamUser.class); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(SteamUser.class, handler.getClass()); + } + + @Test + public void getRequiredHandlerThrowsWhenNotRegistered() { + var steamClient = new SteamClient(); + + Assertions.assertThrows(IllegalArgumentException.class, () -> steamClient.getRequiredHandler(TestMsgHandler.class)); + } + @Test public void getNextJobIDSetsProcessIDToZero() { var jobID = client.getNextJobID(); diff --git a/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerAsyncTest.kt b/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerAsyncTest.kt new file mode 100644 index 00000000..b7726742 --- /dev/null +++ b/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerAsyncTest.kt @@ -0,0 +1,221 @@ +package `in`.dragonbra.javasteam.steam.steamclient.callbackmgr + +import `in`.dragonbra.javasteam.TestBase +import `in`.dragonbra.javasteam.steam.steamclient.SteamClient +import `in`.dragonbra.javasteam.types.JobID +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.milliseconds + +class CallbackManagerAsyncTest : TestBase() { + + private lateinit var client: SteamClient + private lateinit var mgr: CallbackManager + + @BeforeEach + fun setUp() { + client = SteamClient() + mgr = CallbackManager(client) + } + + @Test + fun postedCallbacksTriggerActionsAsync() = runTest { + val callbacks = Array(10) { + CallbackForTest(uniqueID = UUID.randomUUID()) + } + + val numCallbacksRun = AtomicInteger(0) + + mgr.subscribe { cb -> + val index = numCallbacksRun.get() + Assertions.assertTrue(index < callbacks.size) + Assertions.assertEquals(callbacks[index].uniqueID, cb.uniqueID) + numCallbacksRun.incrementAndGet() + }.use { + callbacks.forEach { client.postCallback(it) } + + repeat(callbacks.size) { i -> + mgr.runWaitCallbackAsync() + Assertions.assertEquals(i + 1, numCallbacksRun.get()) + } + + // Callbacks should have been freed. + mgr.runWaitAllCallbacks(0L) + Assertions.assertEquals(10, numCallbacksRun.get()) + } + } + + @Test + fun correctlyAwaitsForAsyncCallbacks() = runTest { + val callback = CallbackForTest(uniqueID = UUID.randomUUID()) + + val numCallbacksRun = AtomicInteger(0) + + mgr.subscribe { cb -> + delay(100.milliseconds) + Assertions.assertEquals(callback.uniqueID, cb.uniqueID) + numCallbacksRun.incrementAndGet() + }.use { + repeat(10) { client.postCallback(callback) } + + repeat(10) { i -> + mgr.runWaitCallbackAsync() + Assertions.assertEquals(i + 1, numCallbacksRun.get()) + } + + mgr.runWaitAllCallbacks(0L) + Assertions.assertEquals(10, numCallbacksRun.get()) + } + } + + @Test + fun asyncCallbackWithJobIDTriggersAction() = runTest { + val jobID = JobID(123456) + val callback = CallbackForTest(uniqueID = UUID.randomUUID()).apply { + this.jobID = jobID + } + + val didCall = AtomicBoolean(false) + + mgr.subscribe(jobID = JobID(123456)) { cb -> + delay(0.milliseconds) // yield + Assertions.assertEquals(callback.uniqueID, cb.uniqueID) + Assertions.assertEquals(jobID, cb.jobID) + didCall.set(true) + }.use { + client.postCallback(callback) + mgr.runWaitCallbackAsync() + } + + Assertions.assertTrue(didCall.get()) + } + + @Test + fun asyncCallbackDoesNotTriggerActionForWrongJobID() = runTest { + val jobID = JobID(123456) + val callback = CallbackForTest(uniqueID = UUID.randomUUID()).apply { + this.jobID = jobID + } + + val didCall = AtomicBoolean(false) + + mgr.subscribe(jobID = JobID(123)) { _ -> + delay(0.milliseconds) // yield + didCall.set(true) + }.use { + client.postCallback(callback) + mgr.runWaitCallbackAsync() + } + + Assertions.assertFalse(didCall.get()) + } + + @Test + fun asyncCallbackIsBlockedBySyncRunCallbacks() = runTest { + val callback = CallbackForTest(uniqueID = UUID.randomUUID()) + + val didCall = AtomicBoolean(false) + + mgr.subscribe { cb -> + delay(50.milliseconds) + Assertions.assertEquals(callback.uniqueID, cb.uniqueID) + didCall.set(true) + }.use { + client.postCallback(callback) + mgr.runCallbacks() + } + + Assertions.assertTrue(didCall.get()) + } + + @Test + fun asyncSubscribedFunctionDoesNotRunWhenSubscriptionIsDisposed() = runTest { + val callback = CallbackForTest() + + val callCount = AtomicInteger(0) + + mgr.subscribe { _ -> + delay(0.milliseconds) // yield + callCount.incrementAndGet() + }.use { + client.postCallback(callback) + mgr.runWaitCallbackAsync() + } + + client.postCallback(callback) + mgr.runWaitCallbackAsync() + + Assertions.assertEquals(1, callCount.get()) + } + + @Test + fun mixedSyncAndAsyncSubscribersBothTrigger() = runTest { + val callback = CallbackForTest(uniqueID = UUID.randomUUID()) + + val syncCalled = AtomicBoolean(false) + val asyncCalled = AtomicBoolean(false) + + val s1 = mgr.subscribe { cb -> + Assertions.assertEquals(callback.uniqueID, cb.uniqueID) + syncCalled.set(true) + } + + val s2 = mgr.subscribe { cb -> + delay(0.milliseconds) // yield + Assertions.assertEquals(callback.uniqueID, cb.uniqueID) + asyncCalled.set(true) + } + + s1.use { + s2.use { + client.postCallback(callback) + mgr.runWaitCallbackAsync() + } + } + + Assertions.assertTrue(syncCalled.get()) + Assertions.assertTrue(asyncCalled.get()) + } + + @Test + fun asyncCallbackExceptionPropagatesOnAsyncPath() = runTest { + val callback = CallbackForTest(uniqueID = UUID.randomUUID()) + + mgr.subscribe { _ -> + delay(0.milliseconds) // yield + throw IllegalStateException("test exception") + }.use { + client.postCallback(callback) + + val ex = Assertions.assertThrows(IllegalStateException::class.java) { + kotlinx.coroutines.runBlocking { mgr.runWaitCallbackAsync() } + } + Assertions.assertEquals("test exception", ex.message) + } + } + + @Test + fun asyncCallbackExceptionWrappedOnSyncPath() { + val callback = CallbackForTest(uniqueID = UUID.randomUUID()) + + mgr.subscribe { _ -> + delay(0.milliseconds) // yield + throw IllegalStateException("test exception") + }.use { + client.postCallback(callback) + + val ex = Assertions.assertThrows(IllegalStateException::class.java) { + mgr.runCallbacks() + } + Assertions.assertEquals("test exception", ex.message) + } + } + + class CallbackForTest(val uniqueID: UUID? = null) : CallbackMsg() +} diff --git a/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerTest.java b/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerTest.java index 89521ad6..ed134079 100644 --- a/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerTest.java +++ b/src/test/java/in/dragonbra/javasteam/steam/steamclient/callbackmgr/CallbackManagerTest.java @@ -14,6 +14,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * @author lngtr @@ -33,12 +34,11 @@ public void setUp() { @Test public void postedCallbackTriggersAction() { - CallbackForTest callback = new CallbackForTest(); + var callback = new CallbackForTest(); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, cb -> { + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { Assertions.assertEquals(callback.uniqueID, cb.getUniqueID()); didCall.set(true); })) { @@ -52,12 +52,11 @@ public void postedCallbackTriggersAction() { @Test public void postedCallbackTriggersAction_CatchAll() { - final CallbackForTest callback = new CallbackForTest(); + var callback = new CallbackForTest(); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, cb -> { + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { Assertions.assertInstanceOf(CallbackForTest.class, cb); Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); didCall.set(true); @@ -72,17 +71,15 @@ public void postedCallbackTriggersAction_CatchAll() { @Test public void postedCallbackTriggersActionForExplicitJobIDInvalid() { - final JobID jobID = new JobID(123456); - final CallbackForTest callback = new CallbackForTest(); - + var jobID = new JobID(123456); + var callback = new CallbackForTest(); callback.setJobID(jobID); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, JobID.INVALID, cb -> { - Assertions.assertEquals(jobID, cb.getJobID()); + try (var ignored = mgr.subscribe(CallbackForTest.class, JobID.INVALID, cb -> { Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + Assertions.assertEquals(jobID, cb.getJobID()); didCall.set(true); })) { postAndRunCallback(callback); @@ -95,17 +92,15 @@ public void postedCallbackTriggersActionForExplicitJobIDInvalid() { @Test public void postedCallbackWithJobIDTriggersActionWhenNoJobIDSpecified() { - final JobID jobID = new JobID(123456); - final CallbackForTest callback = new CallbackForTest(); - + var jobID = new JobID(123456); + var callback = new CallbackForTest(); callback.setJobID(jobID); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, cb -> { - Assertions.assertEquals(jobID, cb.getJobID()); + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + Assertions.assertEquals(jobID, cb.getJobID()); didCall.set(true); })) { postAndRunCallback(callback); @@ -118,15 +113,13 @@ public void postedCallbackWithJobIDTriggersActionWhenNoJobIDSpecified() { @Test public void postedCallbackDoesNotTriggerActionForWrongJobID() { - JobID jobID = new JobID(123456); - CallbackForTest callback = new CallbackForTest(); - + var jobID = new JobID(123456); + var callback = new CallbackForTest(); callback.setJobID(jobID); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, new JobID(123), cb -> { + try (var ignored = mgr.subscribe(CallbackForTest.class, new JobID(123), cb -> { didCall.set(true); })) { postAndRunCallback(callback); @@ -139,17 +132,15 @@ public void postedCallbackDoesNotTriggerActionForWrongJobID() { @Test public void postedCallbackWithJobIDTriggersCallbackForJobID() { - final JobID jobID = new JobID(123456); - final CallbackForTest callback = new CallbackForTest(); - + var jobID = new JobID(123456); + var callback = new CallbackForTest(); callback.setJobID(jobID); callback.setUniqueID(UUID.randomUUID()); var didCall = new AtomicBoolean(false); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, new JobID(123456), cb -> { - Assertions.assertEquals(jobID, cb.getJobID()); + try (var ignored = mgr.subscribe(CallbackForTest.class, new JobID(123456), cb -> { Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + Assertions.assertEquals(jobID, cb.getJobID()); didCall.set(true); })) { postAndRunCallback(callback); @@ -162,11 +153,10 @@ public void postedCallbackWithJobIDTriggersCallbackForJobID() { @Test public void subscribedFunctionDoesNotRunWhenSubscriptionIsDisposed() { - CallbackForTest callback = new CallbackForTest(); + var callback = new CallbackForTest(); var callCount = new AtomicInteger(0); - - try (Closeable ignored = mgr.subscribe(CallbackForTest.class, cb -> callCount.incrementAndGet())) { + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> callCount.incrementAndGet())) { postAndRunCallback(callback); } catch (IOException e) { logger.error(e); @@ -182,7 +172,6 @@ public void postedCallbacksTriggerActions() { callback.setUniqueID(UUID.randomUUID()); var numCallbacksRun = new AtomicInteger(0); - try (var ignored = mgr.subscribe( CallbackForTest.class, cb -> { Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); @@ -193,7 +182,7 @@ public void postedCallbacksTriggerActions() { client.postCallback(callback); } - mgr.runWaitAllCallbacks(1L); // We must provide `some` sort of timeout or null will always happen on 0L + mgr.runWaitAllCallbacks(0L); Assertions.assertEquals(10, numCallbacksRun.get()); // Callbacks should have been freed. @@ -204,6 +193,230 @@ public void postedCallbacksTriggerActions() { } } + @Test + public void postedCallbacksTriggerActionsAsync() throws Exception { + var callbacks = new CallbackForTest[10]; + for (int i = 0; i < callbacks.length; i++) { + callbacks[i] = new CallbackForTest(); + callbacks[i].setUniqueID(UUID.randomUUID()); + } + + var numCallbacksRun = new AtomicInteger(0); + + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { + int index = numCallbacksRun.get(); + Assertions.assertTrue(index < callbacks.length); + Assertions.assertEquals(callbacks[index].getUniqueID(), cb.getUniqueID()); + numCallbacksRun.incrementAndGet(); + })) { + for (var callback : callbacks) { + client.postCallback(callback); + } + + for (int i = 1; i <= callbacks.length; i++) { + mgr.runWaitCallbacks(); + Assertions.assertEquals(i, numCallbacksRun.get()); + } + + // Callbacks should have been freed. + mgr.runWaitAllCallbacks(0L); + Assertions.assertEquals(10, numCallbacksRun.get()); + } + } + + @Test + public void correctlyUnsubscribesFromInsideOfCallback() throws IOException { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + try (var s1 = mgr.subscribe(CallbackForTest.class, cb -> { /* nothing */ })) { + var subscription = new AtomicReference(); + + subscription.set(mgr.subscribe(CallbackForTest.class, cb -> { + Assertions.assertNotNull(subscription.get()); + try { + subscription.get().close(); + } catch (IOException e) { + logger.error(e); + } + subscription.set(null); + })); + + postAndRunCallback(callback); + Assertions.assertNull(subscription.get()); + } + } + + @Test + public void correctlySubscribesFromInsideOfCallback() { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + try ( + var s1 = mgr.subscribe(CallbackForTest.class, cb -> { /* nothing */ }); + var se = mgr.subscribe(CallbackForTest.class, cb -> { + try (var s2 = mgr.subscribe(CallbackForTest.class, cb2 -> { /* nothing */ })) { + // subscribed and immediately disposed within callback + } catch (IOException e) { + logger.error(e); + } + }) + ) { + postAndRunCallback(callback); + } catch (IOException e) { + logger.error(e); + } + } + + @Test + public void correctlyAwaitsForAsyncCallbacks() throws Exception { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + var numCallbacksRun = new AtomicInteger(0); + + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + numCallbacksRun.incrementAndGet(); + })) { + for (int i = 0; i < 10; i++) { + client.postCallback(callback); + } + + for (int i = 1; i <= 10; i++) { + mgr.runWaitCallbacks(); + Assertions.assertEquals(i, numCallbacksRun.get()); + } + + mgr.runWaitAllCallbacks(0L); + Assertions.assertEquals(10, numCallbacksRun.get()); + } + } + + @Test + public void asyncCallbackWithJobIDTriggersAction() throws IOException { + var jobID = new JobID(123456); + var callback = new CallbackForTest(); + callback.setJobID(jobID); + callback.setUniqueID(UUID.randomUUID()); + + var didCall = new AtomicBoolean(false); + + try (var ignored = mgr.subscribe(CallbackForTest.class, new JobID(123456), cb -> { + Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + Assertions.assertEquals(jobID, cb.getJobID()); + didCall.set(true); + })) { + postAndRunCallback(callback); + } + + Assertions.assertTrue(didCall.get()); + } + + @Test + public void asyncCallbackDoesNotTriggerActionForWrongJobID() throws IOException { + var jobID = new JobID(123456); + var callback = new CallbackForTest(); + callback.setJobID(jobID); + callback.setUniqueID(UUID.randomUUID()); + + var didCall = new AtomicBoolean(false); + + try (var ignored = mgr.subscribe(CallbackForTest.class, new JobID(123), cb -> { + didCall.set(true); + })) { + postAndRunCallback(callback); + } + + Assertions.assertFalse(didCall.get()); + } + + @Test + public void asyncCallbackIsBlockedBySyncRunCallbacks() throws IOException { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + var didCall = new AtomicBoolean(false); + + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + didCall.set(true); + })) { + postAndRunCallback(callback); + } + + Assertions.assertTrue(didCall.get()); + } + + @Test + public void asyncSubscribedFunctionDoesNotRunWhenSubscriptionIsDisposed() throws IOException { + var callback = new CallbackForTest(); + + var callCount = new AtomicInteger(0); + + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> callCount.incrementAndGet())) { + postAndRunCallback(callback); + } + postAndRunCallback(callback); + + Assertions.assertEquals(1, callCount.get()); + } + + @Test + public void mixedSyncAndAsyncSubscribersBothTrigger() throws IOException { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + var syncCalled = new AtomicBoolean(false); + var asyncCalled = new AtomicBoolean(false); + + try ( + var s1 = mgr.subscribe(CallbackForTest.class, cb -> { + Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + syncCalled.set(true); + }); + var s2 = mgr.subscribe(CallbackForTest.class, cb -> { + Assertions.assertEquals(callback.getUniqueID(), cb.getUniqueID()); + asyncCalled.set(true); + }) + ) { + postAndRunCallback(callback); + } + + Assertions.assertTrue(syncCalled.get()); + Assertions.assertTrue(asyncCalled.get()); + } + + @Test + public void asyncCallbackExceptionPropagatesOnSyncPath() { + var callback = new CallbackForTest(); + callback.setUniqueID(UUID.randomUUID()); + + try (var ignored = mgr.subscribe(CallbackForTest.class, cb -> { + throw new RuntimeException("test exception"); + })) { + client.postCallback(callback); + + var ex = Assertions.assertThrows( + RuntimeException.class, + () -> mgr.runCallbacks() + ); + Assertions.assertEquals("test exception", ex.getMessage()); + } catch (IOException e) { + logger.error(e); + } + } + private void postAndRunCallback(CallbackMsg callback) { client.postCallback(callback); mgr.runCallbacks();