From 93be26a946ab60af34933136cf6f71fff7ecc18b Mon Sep 17 00:00:00 2001 From: William Date: Wed, 19 Jan 2022 17:29:25 +0000 Subject: [PATCH] Use JedisPool instead of single Jedis connection --- .../william278/husksync/HuskSyncBukkit.java | 6 ++- .../bukkit/listener/BukkitRedisListener.java | 1 + .../husksync/HuskSyncBungeeCord.java | 9 ++-- .../bungeecord/command/BungeeCommand.java | 10 ++-- .../listener/BungeeRedisListener.java | 1 + .../husksync/migrator/MPDBMigrator.java | 7 +-- .../husksync/redis/RedisListener.java | 54 +++++++++++++------ .../husksync/redis/RedisMessage.java | 29 ++++++---- .../william278/husksync/HuskSyncVelocity.java | 5 +- .../velocity/command/VelocityCommand.java | 2 +- .../listener/VelocityRedisListener.java | 1 + 11 files changed, 83 insertions(+), 42 deletions(-) diff --git a/bukkit/src/main/java/me/william278/husksync/HuskSyncBukkit.java b/bukkit/src/main/java/me/william278/husksync/HuskSyncBukkit.java index d987b4d2..d587ada7 100644 --- a/bukkit/src/main/java/me/william278/husksync/HuskSyncBukkit.java +++ b/bukkit/src/main/java/me/william278/husksync/HuskSyncBukkit.java @@ -17,7 +17,6 @@ import org.bukkit.scheduler.BukkitTask; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.logging.Level; public final class HuskSyncBukkit extends JavaPlugin { @@ -32,6 +31,8 @@ public final class HuskSyncBukkit extends JavaPlugin { public static BukkitDataCache bukkitCache; + public static BukkitRedisListener redisListener; + // Used for establishing a handshake with redis public static UUID serverUUID; @@ -120,7 +121,8 @@ public final class HuskSyncBukkit extends JavaPlugin { getServer().getPluginManager().registerEvents(new BukkitEventListener(), this); // Initialize the redis listener - if (!new BukkitRedisListener().isActiveAndEnabled) { + redisListener = new BukkitRedisListener(); + if (!redisListener.isActiveAndEnabled) { getPluginLoader().disablePlugin(this); getLogger().severe("Failed to initialize Redis; disabling HuskSync (" + getServer().getName() + ") v" + getDescription().getVersion()); return; diff --git a/bukkit/src/main/java/me/william278/husksync/bukkit/listener/BukkitRedisListener.java b/bukkit/src/main/java/me/william278/husksync/bukkit/listener/BukkitRedisListener.java index 2726c8b3..a8dc339e 100644 --- a/bukkit/src/main/java/me/william278/husksync/bukkit/listener/BukkitRedisListener.java +++ b/bukkit/src/main/java/me/william278/husksync/bukkit/listener/BukkitRedisListener.java @@ -26,6 +26,7 @@ public class BukkitRedisListener extends RedisListener { // Initialize the listener on the bukkit server public BukkitRedisListener() { + super(); listen(); } diff --git a/bungeecord/src/main/java/me/william278/husksync/HuskSyncBungeeCord.java b/bungeecord/src/main/java/me/william278/husksync/HuskSyncBungeeCord.java index 083c8b55..2e0bc688 100644 --- a/bungeecord/src/main/java/me/william278/husksync/HuskSyncBungeeCord.java +++ b/bungeecord/src/main/java/me/william278/husksync/HuskSyncBungeeCord.java @@ -3,12 +3,12 @@ package me.william278.husksync; import me.william278.husksync.bungeecord.command.BungeeCommand; import me.william278.husksync.bungeecord.config.ConfigLoader; import me.william278.husksync.bungeecord.config.ConfigManager; -import me.william278.husksync.proxy.data.DataManager; import me.william278.husksync.bungeecord.listener.BungeeEventListener; import me.william278.husksync.bungeecord.listener.BungeeRedisListener; -import me.william278.husksync.migrator.MPDBMigrator; import me.william278.husksync.bungeecord.util.BungeeLogger; import me.william278.husksync.bungeecord.util.BungeeUpdateChecker; +import me.william278.husksync.migrator.MPDBMigrator; +import me.william278.husksync.proxy.data.DataManager; import me.william278.husksync.redis.RedisMessage; import me.william278.husksync.util.Logger; import net.byteflux.libby.BungeeLibraryManager; @@ -48,6 +48,8 @@ public final class HuskSyncBungeeCord extends Plugin { public static MPDBMigrator mpdbMigrator; + public static BungeeRedisListener redisListener; + private Logger logger; public Logger getBungeeLogger() { @@ -98,7 +100,8 @@ public final class HuskSyncBungeeCord extends Plugin { } // Initialize the redis listener - if (!new BungeeRedisListener().isActiveAndEnabled) { + redisListener = new BungeeRedisListener(); + if (!redisListener.isActiveAndEnabled) { getBungeeLogger().severe("Failed to initialize Redis; HuskSync will now abort loading itself (" + getProxy().getName() + ") v" + getDescription().getVersion()); return; } diff --git a/bungeecord/src/main/java/me/william278/husksync/bungeecord/command/BungeeCommand.java b/bungeecord/src/main/java/me/william278/husksync/bungeecord/command/BungeeCommand.java index 63dcce89..2c670448 100644 --- a/bungeecord/src/main/java/me/william278/husksync/bungeecord/command/BungeeCommand.java +++ b/bungeecord/src/main/java/me/william278/husksync/bungeecord/command/BungeeCommand.java @@ -2,16 +2,16 @@ package me.william278.husksync.bungeecord.command; import de.themoep.minedown.MineDown; import me.william278.husksync.HuskSyncBungeeCord; -import me.william278.husksync.Server; -import me.william278.husksync.bungeecord.util.BungeeUpdateChecker; -import me.william278.husksync.proxy.command.HuskSyncCommand; -import me.william278.husksync.util.MessageManager; import me.william278.husksync.PlayerData; +import me.william278.husksync.Server; import me.william278.husksync.Settings; import me.william278.husksync.bungeecord.config.ConfigLoader; import me.william278.husksync.bungeecord.config.ConfigManager; +import me.william278.husksync.bungeecord.util.BungeeUpdateChecker; import me.william278.husksync.migrator.MPDBMigrator; +import me.william278.husksync.proxy.command.HuskSyncCommand; import me.william278.husksync.redis.RedisMessage; +import me.william278.husksync.util.MessageManager; import net.md_5.bungee.api.CommandSender; import net.md_5.bungee.api.ProxyServer; import net.md_5.bungee.api.connection.ProxiedPlayer; @@ -301,7 +301,7 @@ public class BungeeCommand extends Command implements TabExecutor, HuskSyncComma HuskSyncBungeeCord.synchronisedServers)) { ProxyServer.getInstance().getScheduler().runAsync(plugin, () -> HuskSyncBungeeCord.mpdbMigrator.executeMigrationOperations(HuskSyncBungeeCord.dataManager, - HuskSyncBungeeCord.synchronisedServers)); + HuskSyncBungeeCord.synchronisedServers, HuskSyncBungeeCord.redisListener)); } } default -> sender.sendMessage(new MineDown("Error: Invalid argument for migration. Use \"husksync migrate\" to start the process").toComponent()); diff --git a/bungeecord/src/main/java/me/william278/husksync/bungeecord/listener/BungeeRedisListener.java b/bungeecord/src/main/java/me/william278/husksync/bungeecord/listener/BungeeRedisListener.java index 95cb7992..19796b0b 100644 --- a/bungeecord/src/main/java/me/william278/husksync/bungeecord/listener/BungeeRedisListener.java +++ b/bungeecord/src/main/java/me/william278/husksync/bungeecord/listener/BungeeRedisListener.java @@ -24,6 +24,7 @@ public class BungeeRedisListener extends RedisListener { // Initialize the listener on the bungee public BungeeRedisListener() { + super(); listen(); } diff --git a/common/src/main/java/me/william278/husksync/migrator/MPDBMigrator.java b/common/src/main/java/me/william278/husksync/migrator/MPDBMigrator.java index 6c6b2b79..32b5bcfb 100644 --- a/common/src/main/java/me/william278/husksync/migrator/MPDBMigrator.java +++ b/common/src/main/java/me/william278/husksync/migrator/MPDBMigrator.java @@ -6,6 +6,7 @@ import me.william278.husksync.Settings; import me.william278.husksync.proxy.data.DataManager; import me.william278.husksync.proxy.data.sql.Database; import me.william278.husksync.proxy.data.sql.MySQL; +import me.william278.husksync.redis.RedisListener; import me.william278.husksync.redis.RedisMessage; import me.william278.husksync.util.Logger; @@ -95,7 +96,7 @@ public class MPDBMigrator { } // Carry out the migration - public void executeMigrationOperations(DataManager dataManager, HashSet synchronisedServers) { + public void executeMigrationOperations(DataManager dataManager, HashSet synchronisedServers, RedisListener redisListener) { // Prepare the target database for insertion prepareTargetDatabase(dataManager); @@ -109,7 +110,7 @@ public class MPDBMigrator { getExperienceData(); // Send the encoded data to the Bukkit servers for conversion - sendEncodedData(synchronisedServers); + sendEncodedData(synchronisedServers, redisListener); } // Clear the new database out of current data @@ -200,7 +201,7 @@ public class MPDBMigrator { } } - private void sendEncodedData(HashSet synchronisedServers) { + private void sendEncodedData(HashSet synchronisedServers, RedisListener redisListener) { for (Server processingServer : synchronisedServers) { if (processingServer.hasMySqlPlayerDataBridge()) { for (MPDBPlayerData data : mpdbPlayerData) { diff --git a/common/src/main/java/me/william278/husksync/redis/RedisListener.java b/common/src/main/java/me/william278/husksync/redis/RedisListener.java index c5fb29f1..0fd23a57 100644 --- a/common/src/main/java/me/william278/husksync/redis/RedisListener.java +++ b/common/src/main/java/me/william278/husksync/redis/RedisListener.java @@ -1,9 +1,7 @@ package me.william278.husksync.redis; import me.william278.husksync.Settings; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisClientConfig; -import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisException; import java.io.IOException; @@ -16,6 +14,18 @@ public abstract class RedisListener { */ public boolean isActiveAndEnabled; + /** + * Pool of connections to the Redis server + */ + private static JedisPool jedisPool; + + /** + * Creates a new RedisListener and initialises the Redis connection + */ + public RedisListener() { + jedisPool = new JedisPool(new JedisPoolConfig(), Settings.redisHost, Settings.redisPort); + } + /** * Handle an incoming {@link RedisMessage} * @@ -31,20 +41,34 @@ public abstract class RedisListener { */ public abstract void log(Level level, String message); + /** + * Fetch a connection to the Redis server from the JedisPool + * + * @return Jedis instance from the pool + */ + public static Jedis getJedisConnection() { + return jedisPool.getResource(); + } + /** * Start the Redis listener */ public final void listen() { - try (Jedis jedis = new Jedis(Settings.redisHost, Settings.redisPort)) { - final String jedisPassword = Settings.redisPassword; - jedis.connect(); - if (jedis.isConnected()) { + final String jedisPassword = Settings.redisPassword; + new Thread(() -> { + try (Jedis jedis = getJedisConnection()) { if (!jedisPassword.equals("")) { jedis.auth(jedisPassword); } - isActiveAndEnabled = true; - log(Level.INFO, "Enabled Redis listener successfully!"); - new Thread(() -> jedis.subscribe(new JedisPubSub() { + if (jedis.isConnected()) { + isActiveAndEnabled = true; + log(Level.INFO, "Enabled Redis listener successfully!"); + } else { + isActiveAndEnabled = false; + log(Level.SEVERE, "Connection to the Redis server could not be established, please check the credentials."); + return; + } + jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { // Only accept messages to the HuskSync channel @@ -59,13 +83,11 @@ public abstract class RedisListener { log(Level.SEVERE, "Failed to deserialize message target"); } } - }, RedisMessage.REDIS_CHANNEL), "Redis Subscriber").start(); - } else { + }, RedisMessage.REDIS_CHANNEL); + } catch (JedisException | IllegalStateException e) { + log(Level.SEVERE, "An exception occurred with the Jedis Subscriber!"); isActiveAndEnabled = false; - log(Level.SEVERE, "Failed to initialize the redis listener!"); } - } catch (JedisException e) { - log(Level.SEVERE, "Failed to establish a connection to the Redis server!"); - } + }, "Redis Subscriber").start(); } } diff --git a/common/src/main/java/me/william278/husksync/redis/RedisMessage.java b/common/src/main/java/me/william278/husksync/redis/RedisMessage.java index 951bbbb9..abac0c0a 100644 --- a/common/src/main/java/me/william278/husksync/redis/RedisMessage.java +++ b/common/src/main/java/me/william278/husksync/redis/RedisMessage.java @@ -22,8 +22,9 @@ public class RedisMessage { /** * Create a new RedisMessage - * @param type The type of the message - * @param target Who will receive this message + * + * @param type The type of the message + * @param target Who will receive this message * @param messageData The message data elements */ public RedisMessage(MessageType type, MessageTarget target, String... messageData) { @@ -38,6 +39,7 @@ public class RedisMessage { /** * Get a new RedisMessage from an incoming message string + * * @param messageString The message string to parse */ public RedisMessage(String messageString) throws IOException, ClassNotFoundException { @@ -49,6 +51,7 @@ public class RedisMessage { /** * Returns the full, formatted message string with type, target & data + * * @return The fully formatted message */ private String getFullMessage() throws IOException { @@ -61,21 +64,23 @@ public class RedisMessage { * Send the redis message */ public void send() throws IOException { - try (Jedis publisher = new Jedis(Settings.redisHost, Settings.redisPort)) { - final String jedisPassword = Settings.redisPassword; - publisher.connect(); - if (!jedisPassword.equals("")) { - publisher.auth(jedisPassword); - } - publisher.publish(REDIS_CHANNEL, getFullMessage()); + try (Jedis publisher = RedisListener.getJedisConnection()) { + final String jedisPassword = Settings.redisPassword; + publisher.connect(); + if (!jedisPassword.equals("")) { + publisher.auth(jedisPassword); } + publisher.publish(REDIS_CHANNEL, getFullMessage()); + } } public String getMessageData() { return messageData; } - public String[] getMessageDataElements() { return messageData.split(MESSAGE_DATA_SEPARATOR); } + public String[] getMessageDataElements() { + return messageData.split(MESSAGE_DATA_SEPARATOR); + } public MessageType getMessageType() { return messageType; @@ -173,7 +178,9 @@ public class RedisMessage { /** * A record that defines the target of a plugin message; a spigot server or the proxy server(s). For Bukkit servers, the name of the server must also be specified */ - public record MessageTarget(Settings.ServerType targetServerType, UUID targetPlayerUUID, String targetClusterId) implements Serializable { } + public record MessageTarget(Settings.ServerType targetServerType, UUID targetPlayerUUID, + String targetClusterId) implements Serializable { + } /** * Deserialize an object from a Base64 string diff --git a/velocity/src/main/java/me/william278/husksync/HuskSyncVelocity.java b/velocity/src/main/java/me/william278/husksync/HuskSyncVelocity.java index 8a56a331..b2526008 100644 --- a/velocity/src/main/java/me/william278/husksync/HuskSyncVelocity.java +++ b/velocity/src/main/java/me/william278/husksync/HuskSyncVelocity.java @@ -69,6 +69,8 @@ public class HuskSyncVelocity { public static DataManager dataManager; + public static VelocityRedisListener redisListener; + public static MPDBMigrator mpdbMigrator; private final Logger logger; @@ -146,7 +148,8 @@ public class HuskSyncVelocity { } // Initialize the redis listener - if (!new VelocityRedisListener().isActiveAndEnabled) { + redisListener = new VelocityRedisListener(); + if (!redisListener.isActiveAndEnabled) { getVelocityLogger().severe("Failed to initialize Redis; HuskSync will now abort loading itself (Velocity) v" + VERSION); return; } diff --git a/velocity/src/main/java/me/william278/husksync/velocity/command/VelocityCommand.java b/velocity/src/main/java/me/william278/husksync/velocity/command/VelocityCommand.java index 9825574e..7de1fe17 100644 --- a/velocity/src/main/java/me/william278/husksync/velocity/command/VelocityCommand.java +++ b/velocity/src/main/java/me/william278/husksync/velocity/command/VelocityCommand.java @@ -294,7 +294,7 @@ public class VelocityCommand implements SimpleCommand, HuskSyncCommand { HuskSyncVelocity.synchronisedServers)) { plugin.getProxyServer().getScheduler().buildTask(plugin, () -> HuskSyncVelocity.mpdbMigrator.executeMigrationOperations(HuskSyncVelocity.dataManager, - HuskSyncVelocity.synchronisedServers)).schedule(); + HuskSyncVelocity.synchronisedServers, HuskSyncVelocity.redisListener)).schedule(); } } default -> sender.sendMessage(new MineDown("Error: Invalid argument for migration. Use \"husksync migrate\" to start the process").toComponent()); diff --git a/velocity/src/main/java/me/william278/husksync/velocity/listener/VelocityRedisListener.java b/velocity/src/main/java/me/william278/husksync/velocity/listener/VelocityRedisListener.java index 4fa139ff..d6c3b882 100644 --- a/velocity/src/main/java/me/william278/husksync/velocity/listener/VelocityRedisListener.java +++ b/velocity/src/main/java/me/william278/husksync/velocity/listener/VelocityRedisListener.java @@ -23,6 +23,7 @@ public class VelocityRedisListener extends RedisListener { // Initialize the listener on the bungee public VelocityRedisListener() { + super(); listen(); }