Fix data sync when changing servers, consume keys when retrieved

feat/data-edit-commands
William 3 years ago
parent 38c261871a
commit d78dd42b72

@ -3,10 +3,8 @@ package net.william278.husksync.listener;
import net.william278.husksync.BukkitHuskSync;
import net.william278.husksync.player.BukkitPlayer;
import org.bukkit.Bukkit;
import org.bukkit.event.Cancellable;
import org.bukkit.event.EventHandler;
import org.bukkit.event.Listener;
import org.bukkit.event.player.PlayerEvent;
import org.bukkit.event.player.PlayerJoinEvent;
import org.bukkit.event.player.PlayerQuitEvent;
import org.bukkit.event.world.WorldSaveEvent;
@ -32,7 +30,7 @@ public class BukkitEventListener extends EventListener implements Listener {
BukkitPlayer.remove(event.getPlayer());
}
@EventHandler
@EventHandler(ignoreCancelled = true)
public void onWorldSave(@NotNull WorldSaveEvent event) {
super.handleWorldSave(event.getWorld().getPlayers().stream().map(BukkitPlayer::adapt)
.collect(Collectors.toList()));

@ -70,9 +70,10 @@ public class BukkitPlayer extends OnlineUser {
@Override
public CompletableFuture<Void> setStatus(@NotNull StatusData statusData,
boolean setHealth, boolean setMaxHealth,
boolean setHunger, boolean setExperience,
boolean setGameMode, boolean setFlying) {
final boolean setHealth, final boolean setMaxHealth,
final boolean setHunger, final boolean setExperience,
final boolean setGameMode, final boolean setFlying,
final boolean setSelectedItemSlot) {
return CompletableFuture.runAsync(() -> {
double currentMaxHealth = Objects.requireNonNull(player.getAttribute(Attribute.GENERIC_MAX_HEALTH))
.getBaseValue();
@ -101,20 +102,26 @@ public class BukkitPlayer extends OnlineUser {
player.setSaturation(statusData.saturation);
player.setExhaustion(statusData.saturationExhaustion);
}
if (setSelectedItemSlot) {
player.getInventory().setHeldItemSlot(statusData.selectedItemSlot);
}
if (setExperience) {
player.setTotalExperience(statusData.totalExperience);
player.setLevel(statusData.expLevel);
player.setExp(statusData.expProgress);
}
if (setGameMode) {
player.setGameMode(GameMode.valueOf(statusData.gameMode));
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(), () ->
player.setGameMode(GameMode.valueOf(statusData.gameMode)));
}
if (setFlying) {
if (statusData.isFlying) {
player.setAllowFlight(true);
player.setFlying(true);
}
player.setFlying(false);
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(), () -> {
if (statusData.isFlying) {
player.setAllowFlight(true);
player.setFlying(true);
}
player.setFlying(false);
});
}
});
}
@ -189,7 +196,8 @@ public class BukkitPlayer extends OnlineUser {
@Override
public CompletableFuture<Void> setAdvancements(@NotNull List<AdvancementData> advancementData) {
return CompletableFuture.runAsync(() -> {
return CompletableFuture.runAsync(() -> Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(), () -> {
// Temporarily disable advancement announcing if needed
boolean announceAdvancementUpdate = false;
if (Boolean.TRUE.equals(player.getWorld().getGameRuleValue(GameRule.ANNOUNCE_ADVANCEMENTS))) {
@ -205,50 +213,53 @@ public class BukkitPlayer extends OnlineUser {
// Determines whether the experience might have changed warranting an update
final AtomicBoolean correctExperience = new AtomicBoolean(false);
// Apply the advancements to the player
final Iterator<Advancement> serverAdvancements = Bukkit.getServer().advancementIterator();
while (serverAdvancements.hasNext()) {
// Iterate through all advancements
final Advancement advancement = serverAdvancements.next();
final AdvancementProgress playerProgress = player.getAdvancementProgress(advancement);
advancementData.stream().filter(record -> record.key.equals(advancement.getKey().toString())).findFirst().ifPresentOrElse(
// Award all criteria that the player does not have that they do on the cache
record -> {
record.completedCriteria.keySet().stream()
.filter(criterion -> !playerProgress.getAwardedCriteria().contains(criterion))
.forEach(criterion -> {
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).awardCriteria(criterion));
correctExperience.set(true);
});
// Revoke all criteria that the player does have but should not
new ArrayList<>(playerProgress.getAwardedCriteria()).stream().filter(criterion -> !record.completedCriteria.containsKey(criterion))
.forEach(criterion -> Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).revokeCriteria(criterion)));
},
// Revoke the criteria as the player shouldn't have any
() -> new ArrayList<>(playerProgress.getAwardedCriteria()).forEach(criterion ->
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).revokeCriteria(criterion))));
// Update the player's experience in case the advancement changed that
if (correctExperience.get()) {
player.setLevel(experienceLevel);
player.setExp(expProgress);
correctExperience.set(false);
// Run asynchronously as advancement setting is expensive
CompletableFuture.runAsync(() -> {
// Apply the advancements to the player
final Iterator<Advancement> serverAdvancements = Bukkit.getServer().advancementIterator();
while (serverAdvancements.hasNext()) {
// Iterate through all advancements
final Advancement advancement = serverAdvancements.next();
final AdvancementProgress playerProgress = player.getAdvancementProgress(advancement);
advancementData.stream().filter(record -> record.key.equals(advancement.getKey().toString())).findFirst().ifPresentOrElse(
// Award all criteria that the player does not have that they do on the cache
record -> {
record.completedCriteria.keySet().stream()
.filter(criterion -> !playerProgress.getAwardedCriteria().contains(criterion))
.forEach(criterion -> {
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).awardCriteria(criterion));
correctExperience.set(true);
});
// Revoke all criteria that the player does have but should not
new ArrayList<>(playerProgress.getAwardedCriteria()).stream().filter(criterion -> !record.completedCriteria.containsKey(criterion))
.forEach(criterion -> Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).revokeCriteria(criterion)));
},
// Revoke the criteria as the player shouldn't have any
() -> new ArrayList<>(playerProgress.getAwardedCriteria()).forEach(criterion ->
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(),
() -> player.getAdvancementProgress(advancement).revokeCriteria(criterion))));
// Update the player's experience in case the advancement changed that
if (correctExperience.get()) {
player.setLevel(experienceLevel);
player.setExp(expProgress);
correctExperience.set(false);
}
}
}
// Re-enable announcing advancements (back on main thread again)
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(), () -> {
if (finalAnnounceAdvancementUpdate) {
player.getWorld().setGameRule(GameRule.ANNOUNCE_ADVANCEMENTS, true);
}
// Re-enable announcing advancements (back on main thread again)
Bukkit.getScheduler().runTask(BukkitHuskSync.getInstance(), () -> {
if (finalAnnounceAdvancementUpdate) {
player.getWorld().setGameRule(GameRule.ANNOUNCE_ADVANCEMENTS, true);
}
});
});
});
}));
}
@Override
@ -397,6 +408,16 @@ public class BukkitPlayer extends OnlineUser {
});
}
@Override
public boolean isDead() {
return player.isDead() || player.getHealth() <= 0;
}
@Override
public boolean isOffline() {
return player == null;
}
@Override
public boolean hasPermission(@NotNull String node) {
return player.hasPermission(node);

@ -35,7 +35,7 @@ public class HuskSyncCommand extends CommandBase implements TabCompletable, Cons
"[•](white) [Currently running:](#00fb9a) [Version " + updateChecker.getCurrentVersion() + "](gray)" +
"[•](white) [Download links:](#00fb9a) [[⏩ Spigot]](gray open_url=https://www.spigotmc.org/resources/husksync.97144/updates) [•](#262626) [[⏩ Polymart]](gray open_url=https://polymart.org/resource/husksync.1634/updates) [•](#262626) [[⏩ Songoda]](gray open_url=https://songoda.com/marketplace/product/husksync-a-modern-cross-server-player-data-synchronization-system.758)"));
} else {
player.sendMessage(new MineDown("[HuskSync](#00fb9a bold) [| HuskSync is up-to-date, running version " + latestVersion));
player.sendMessage(new MineDown("[HuskSync](#00fb9a bold) [| HuskSync is up-to-date, running version " + latestVersion + "](#00fb9a)"));
}
});
}
@ -46,17 +46,17 @@ public class HuskSyncCommand extends CommandBase implements TabCompletable, Cons
return;
}
plugin.reload();
player.sendMessage(new MineDown("[HuskSync](#00fb9a bold) &#00fb9a&| Reloaded config & message files."));
player.sendMessage(new MineDown("[HuskSync](#00fb9a bold) [| Reloaded config & message files.]((#00fb9a)"));
}
default ->
plugin.getLocales().getLocale("error_invalid_syntax", "/husksync <update|info|reload>").ifPresent(player::sendMessage);
plugin.getLocales().getLocale("error_invalid_syntax", "/husksync <update/info/reload>").ifPresent(player::sendMessage);
}
}
@Override
public void onConsoleExecute(@NotNull String[] args) {
if (args.length < 1) {
plugin.getLoggingAdapter().log(Level.INFO, "Console usage: /husksync <update|info|reload|migrate>");
plugin.getLoggingAdapter().log(Level.INFO, "Console usage: /husksync <update/info/reload/migrate>");
return;
}
switch (args[0].toLowerCase()) {
@ -71,7 +71,7 @@ public class HuskSyncCommand extends CommandBase implements TabCompletable, Cons
//todo - MPDB migrator
}
default ->
plugin.getLoggingAdapter().log(Level.INFO, "Invalid syntax. Console usage: /husksync <update|info|reload|migrate>");
plugin.getLoggingAdapter().log(Level.INFO, "Invalid syntax. Console usage: /husksync <update/info/reload/migrate>");
}
}

@ -15,11 +15,11 @@ import java.util.regex.Pattern;
public class Locales {
public static final String PLUGIN_INFORMATION = """
[HuskSync](#00fb9a bold) [| Version %version%(#00fb9a)
[HuskSync](#00fb9a bold) [| Version %version%](#00fb9a)
[A modern, cross-server player data synchronization system](gray)
[ Author:](white) [William278](gray show_text=&7Click to visit website open_url=https://william278.net)
[ Contributors:](white) [HarvelsX](gray show_text=&7Code)
[ Translators:](white) [Namiu/](gray show_text=&7Japanese, ja-jp), [anchelthe](gray show_text=&7Spanish, es-es), [Ceddix](gray show_text=&7German, de-de), [](gray show_text=&7Traditional Chinese, zh-tw), [Ghost-chu](gray show_text=&7Simplified Chinese, zh-cn), [Thourgard](gray show_text=&7Ukrainian, uk-ua)
[ Translators:](white) [Namiu](gray show_text=&7\\(\\) - Japanese, ja-jp), [anchelthe](gray show_text=&7Spanish, es-es), [Ceddix](gray show_text=&7German, de-de), [](gray show_text=&7Traditional Chinese, zh-tw), [Ghost-chu](gray show_text=&7Simplified Chinese, zh-cn), [Thourgard](gray show_text=&7Ukrainian, uk-ua)
[ Plugin Info:](white) [[Link]](#00fb9a show_text=&7Click to open link open_url=https://github.com/WiIIiam278/HuskSync/)
[ Report Issues:](white) [[Link]](#00fb9a show_text=&7Click to open link open_url=https://github.com/WiIIiam278/HuskSync/issues)
[ Support Discord:](white) [[Link]](#00fb9a show_text=&7Click to join open_url=https://discord.gg/tVYhJfyDWG)""";

@ -271,19 +271,19 @@ public class MySqlDatabase extends Database {
protected CompletableFuture<Void> pruneUserDataRecords(@NotNull User user) {
return CompletableFuture.runAsync(() -> getUserData(user).thenAccept(data -> {
if (data.size() > maxUserDataRecords) {
Collections.reverse(data);
data.subList(0, data.size() - maxUserDataRecords).forEach(dataToDelete -> {
try (Connection connection = getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(formatStatementTables("""
DELETE FROM `%data_table%`
WHERE `version_uuid`=?"""))) {
statement.setString(1, dataToDelete.versionUUID().toString());
statement.executeUpdate();
}
} catch (SQLException e) {
getLogger().log(Level.SEVERE, "Failed to prune user data from the database", e);
try (Connection connection = getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(formatStatementTables("""
DELETE FROM `%data_table%`
WHERE `player_uuid`=?
ORDER BY `timestamp` ASC
LIMIT %entry_count%;""".replace("%entry_count%",
Integer.toString(data.size() - maxUserDataRecords))))) {
statement.setString(1, user.uuid.toString());
statement.executeUpdate();
}
});
} catch (SQLException e) {
getLogger().log(Level.SEVERE, "Failed to prune user data from the database", e);
}
}
}));
}
@ -306,7 +306,7 @@ public class MySqlDatabase extends Database {
} catch (SQLException | IOException e) {
getLogger().log(Level.SEVERE, "Failed to set user data in the database", e);
}
})/*.thenRunAsync(() -> pruneUserDataRecords(user).join())*/;
}).thenRun(() -> pruneUserDataRecords(user).join());
}
@Override

@ -2,6 +2,7 @@ package net.william278.husksync.listener;
import net.william278.husksync.HuskSync;
import net.william278.husksync.player.OnlineUser;
import net.william278.husksync.player.User;
import net.william278.husksync.redis.RedisManager;
import org.jetbrains.annotations.NotNull;
@ -9,8 +10,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class EventListener {
public abstract class EventListener {
/**
* The plugin instance
@ -34,30 +39,58 @@ public class EventListener {
}
public final void handlePlayerJoin(@NotNull OnlineUser user) {
if (user.isDead()) {
return;
}
usersAwaitingSync.add(user.uuid);
huskSync.getRedisManager().getUserData(user, RedisManager.RedisKeyType.SERVER_CHANGE).thenAccept(
cachedUserData -> cachedUserData.ifPresentOrElse(
userData -> user.setData(userData, huskSync.getSettings()).join(),
() -> huskSync.getDatabase().getCurrentUserData(user).thenAccept(
databaseUserData -> databaseUserData.ifPresent(
data -> user.setData(data.userData(), huskSync.getSettings()).join())).join())).thenRunAsync(
() -> {
huskSync.getLocales().getLocale("synchronisation_complete").ifPresent(user::sendActionBar);
usersAwaitingSync.remove(user.uuid);
huskSync.getDatabase().ensureUser(user).join();
CompletableFuture.runAsync(() -> huskSync.getRedisManager().getUserServerSwitch(user).thenAccept(changingServers -> {
if (!changingServers) {
// Fetch from the database if the user isn't changing servers
setUserFromDatabase(user).thenRun(() -> handleSynchronisationCompletion(user));
} else {
final int TIME_OUT_MILLISECONDS = 3200;
CompletableFuture.runAsync(() -> {
final AtomicInteger currentMilliseconds = new AtomicInteger(0);
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
// Set the user as soon as the source server has set the data to redis
executor.scheduleAtFixedRate(() -> {
if (disabling || currentMilliseconds.get() > TIME_OUT_MILLISECONDS) {
executor.shutdown();
setUserFromDatabase(user).thenRun(() -> handleSynchronisationCompletion(user));
return;
}
huskSync.getRedisManager().getUserData(user).thenAccept(redisUserData ->
redisUserData.ifPresent(redisData -> {
user.setData(redisData, huskSync.getSettings()).join();
executor.shutdown();
})).join();
currentMilliseconds.addAndGet(200);
}, 0, 200L, TimeUnit.MILLISECONDS);
});
}
}));
}
private CompletableFuture<Void> setUserFromDatabase(@NotNull OnlineUser user) {
return huskSync.getDatabase().getCurrentUserData(user)
.thenAccept(databaseUserData -> databaseUserData.ifPresent(databaseData -> user
.setData(databaseData.userData(), huskSync.getSettings()).join()));
}
private void handleSynchronisationCompletion(@NotNull OnlineUser user) {
huskSync.getLocales().getLocale("synchronisation_complete").ifPresent(user::sendActionBar);
usersAwaitingSync.remove(user.uuid);
huskSync.getDatabase().ensureUser(user).join();
}
public final void handlePlayerQuit(@NotNull OnlineUser user) {
if (disabling) {
return;
}
user.getUserData().thenAccept(userData -> {
System.out.println(userData.userData().toJson());
huskSync.getRedisManager()
.setUserData(user, userData.userData(), RedisManager.RedisKeyType.SERVER_CHANGE).thenRun(
() -> huskSync.getDatabase().setUserData(user, userData).join());
});
huskSync.getRedisManager().setUserServerSwitch(user).thenRun(() -> user.getUserData().thenAccept(
userData -> huskSync.getRedisManager().setUserData(user, userData.userData()).thenRun(
() -> huskSync.getDatabase().setUserData(user, userData).join())));
}
public final void handleWorldSave(@NotNull List<OnlineUser> usersInWorld) {

@ -39,7 +39,8 @@ public abstract class OnlineUser extends User {
public abstract CompletableFuture<Void> setStatus(@NotNull StatusData statusData,
final boolean setHealth, final boolean setMaxHealth,
final boolean setHunger, final boolean setExperience,
final boolean setGameMode, boolean setFlying);
final boolean setGameMode, final boolean setFlying,
final boolean setSelectedItemSlot);
/**
* Get the player's inventory {@link InventoryData} contents
@ -147,6 +148,20 @@ public abstract class OnlineUser extends User {
*/
public abstract CompletableFuture<Void> setPersistentDataContainer(@NotNull PersistentDataContainerData persistentDataContainerData);
/**
* Indicates if the player is currently dead
*
* @return {@code true} if the player is dead (health <= 0); {@code false} otherwise
*/
public abstract boolean isDead();
/**
* Indicates if the player has gone offline
*
* @return {@code true} if the player has left the server; {@code false} otherwise
*/
public abstract boolean isOffline();
/**
* Set {@link UserData} to a player
*
@ -156,32 +171,45 @@ public abstract class OnlineUser extends User {
*/
public final CompletableFuture<Void> setData(@NotNull UserData data, @NotNull Settings settings) {
return CompletableFuture.runAsync(() -> {
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_INVENTORIES)) {
setInventory(data.getInventoryData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_ENDER_CHESTS)) {
setEnderChest(data.getEnderChestData()).join();
}
setStatus(data.getStatusData(), settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_HEALTH),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_MAX_HEALTH),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_HUNGER),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_EXPERIENCE),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_GAME_MODE),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_LOCATION)).join();
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_POTION_EFFECTS)) {
setPotionEffects(data.getPotionEffectData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_ADVANCEMENTS)) {
setAdvancements(data.getAdvancementData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_STATISTICS)) {
setStatistics(data.getStatisticData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_PERSISTENT_DATA_CONTAINER)) {
setPersistentDataContainer(data.getPersistentDataContainerData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_LOCATION)) {
setLocation(data.getLocationData()).join();
try {
// Don't set offline players
if (isOffline()) {
return;
}
// Don't set dead players
if (isDead()) {
return;
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_INVENTORIES)) {
setInventory(data.getInventoryData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_ENDER_CHESTS)) {
setEnderChest(data.getEnderChestData()).join();
}
setStatus(data.getStatusData(), settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_HEALTH),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_MAX_HEALTH),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_HUNGER),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_EXPERIENCE),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_GAME_MODE),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_LOCATION),
settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_INVENTORIES)).join();
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_POTION_EFFECTS)) {
setPotionEffects(data.getPotionEffectData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_ADVANCEMENTS)) {
setAdvancements(data.getAdvancementData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_STATISTICS)) {
setStatistics(data.getStatisticData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_PERSISTENT_DATA_CONTAINER)) {
setPersistentDataContainer(data.getPersistentDataContainerData()).join();
}
if (settings.getBooleanValue(Settings.ConfigOption.SYNCHRONIZATION_SYNC_LOCATION)) {
setLocation(data.getLocationData()).join();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}

@ -12,6 +12,7 @@ import redis.clients.jedis.exceptions.JedisException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@ -76,14 +77,14 @@ public class RedisManager {
* @param redisKeyType the type of key to set the data with. This determines the time to live for the data.
* @return a future returning void when complete
*/
public CompletableFuture<Void> setUserData(@NotNull User user, @NotNull UserData userData,
@NotNull RedisKeyType redisKeyType) {
public CompletableFuture<Void> setUserData(@NotNull User user, @NotNull UserData userData) {
try {
return CompletableFuture.runAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
// Set the user's data as a compressed byte array of the json using Snappy
jedis.setex(getKey(redisKeyType, user.uuid), redisKeyType.timeToLive,
jedis.setex(getKey(RedisKeyType.DATA_UPDATE, user.uuid), RedisKeyType.DATA_UPDATE.timeToLive,
Snappy.compress(userData.toJson().getBytes(StandardCharsets.UTF_8)));
System.out.println("Set key at " + new Date().getTime());
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -94,21 +95,34 @@ public class RedisManager {
return null;
}
public CompletableFuture<Void> setUserServerSwitch(@NotNull User user) {
return CompletableFuture.runAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(getKey(RedisKeyType.SERVER_SWITCH, user.uuid),
RedisKeyType.SERVER_SWITCH.timeToLive, new byte[0]);
}
});
}
/**
* Fetch a user's data from the Redis server
* Fetch a user's data from the Redis server and consume the key if found
*
* @param user The user to fetch data for
* @param redisKeyType The type of key to fetch
* @return The user's data, if it's present on the database. Otherwise, an empty optional.
*/
public CompletableFuture<Optional<UserData>> getUserData(@NotNull User user,
@NotNull RedisKeyType redisKeyType) {
public CompletableFuture<Optional<UserData>> getUserData(@NotNull User user) {
return CompletableFuture.supplyAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
final byte[] compressedJson = jedis.get(getKey(redisKeyType, user.uuid));
final byte[] key = getKey(RedisKeyType.DATA_UPDATE, user.uuid);
System.out.println("Reading key at " + new Date().getTime());
final byte[] compressedJson = jedis.get(key);
if (compressedJson == null) {
return Optional.empty();
}
// Consume the key (delete from redis)
jedis.del(key);
// Use Snappy to decompress the json
return Optional.of(UserData.fromJson(new String(Snappy.uncompress(compressedJson),
StandardCharsets.UTF_8)));
@ -118,6 +132,21 @@ public class RedisManager {
});
}
public CompletableFuture<Boolean> getUserServerSwitch(@NotNull User user) {
return CompletableFuture.supplyAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
final byte[] key = getKey(RedisKeyType.SERVER_SWITCH, user.uuid);
final byte[] compressedJson = jedis.get(key);
if (compressedJson == null) {
return false;
}
// Consume the key (delete from redis)
jedis.del(key);
return true;
}
});
}
public void close() {
if (jedisPool != null) {
if (!jedisPool.isClosed()) {
@ -132,7 +161,8 @@ public class RedisManager {
public enum RedisKeyType {
CACHE(60 * 60 * 24),
SERVER_CHANGE(2);
DATA_UPDATE(10),
SERVER_SWITCH(10);
public final int timeToLive;

Loading…
Cancel
Save