refactor: data save event order processing, use new method in DataSyncer (#243)

* fix: fire DataSaveEvent before disconnect

* fix: revert rename `addSnapshot`

* docs: mention `addSnapshot` firing the API event

* refactor: use DataSyncer method for event saving, close #242

* fix: trailing semicolon
feat/data-edit-commands
William 1 year ago committed by GitHub
parent f6773f4e68
commit 12e223618d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -31,11 +31,13 @@ import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User; import net.william278.husksync.user.User;
import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
/** /**
* The common implementation of the HuskSync API, containing cross-platform API calls. * The common implementation of the HuskSync API, containing cross-platform API calls.
@ -262,13 +264,32 @@ public class HuskSyncAPI {
* *
* @param user The user to save the data for * @param user The user to save the data for
* @param snapshot The snapshot to save * @param snapshot The snapshot to save
* @param callback A callback to run after the data has been saved (if the DataSaveEvent was not cancelled)
* @apiNote This will fire the {@link net.william278.husksync.event.DataSaveEvent} event, unless
* the save cause is {@link DataSnapshot.SaveCause#SERVER_SHUTDOWN}
* @since 3.3.2
*/
public void addSnapshot(@NotNull User user, @NotNull DataSnapshot snapshot,
@Nullable BiConsumer<User, DataSnapshot.Packed> callback) {
plugin.runAsync(() -> plugin.getDataSyncer().saveData(
user,
snapshot instanceof DataSnapshot.Unpacked unpacked
? unpacked.pack(plugin) : (DataSnapshot.Packed) snapshot,
callback
));
}
/**
* Adds a data snapshot to the database
*
* @param user The user to save the data for
* @param snapshot The snapshot to save
* @apiNote This will fire the {@link net.william278.husksync.event.DataSaveEvent} event, unless
* * the save cause is {@link DataSnapshot.SaveCause#SERVER_SHUTDOWN}
* @since 3.0 * @since 3.0
*/ */
public void addSnapshot(@NotNull User user, @NotNull DataSnapshot snapshot) { public void addSnapshot(@NotNull User user, @NotNull DataSnapshot snapshot) {
plugin.runAsync(() -> plugin.getDatabase().addSnapshot( this.addSnapshot(user, snapshot, null);
user, snapshot instanceof DataSnapshot.Unpacked unpacked
? unpacked.pack(plugin) : (DataSnapshot.Packed) snapshot
));
} }
/** /**

@ -72,8 +72,8 @@ public class EnderChestCommand extends ItemsCommand {
// Creates a new snapshot with the updated enderChest // Creates a new snapshot with the updated enderChest
@SuppressWarnings("DuplicatedCode") @SuppressWarnings("DuplicatedCode")
private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items items, @NotNull User user) { private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items items, @NotNull User holder) {
final Optional<DataSnapshot.Packed> latestData = plugin.getDatabase().getLatestSnapshot(user); final Optional<DataSnapshot.Packed> latestData = plugin.getDatabase().getLatestSnapshot(holder);
if (latestData.isEmpty()) { if (latestData.isEmpty()) {
plugin.getLocales().getLocale("error_no_data_to_display") plugin.getLocales().getLocale("error_no_data_to_display")
.ifPresent(viewer::sendMessage); .ifPresent(viewer::sendMessage);
@ -90,10 +90,12 @@ public class EnderChestCommand extends ItemsCommand {
); );
}); });
// Save data
final RedisManager redis = plugin.getRedisManager(); final RedisManager redis = plugin.getRedisManager();
plugin.getDatabase().addSnapshot(user, snapshot); plugin.getDataSyncer().saveData(holder, snapshot, (user, data) -> {
redis.sendUserDataUpdate(user, snapshot); redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR));
redis.getUserData(user).ifPresent(data -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR)); redis.sendUserDataUpdate(user, data);
});
} }
} }

@ -72,8 +72,8 @@ public class InventoryCommand extends ItemsCommand {
// Creates a new snapshot with the updated inventory // Creates a new snapshot with the updated inventory
@SuppressWarnings("DuplicatedCode") @SuppressWarnings("DuplicatedCode")
private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items items, @NotNull User user) { private void updateItems(@NotNull OnlineUser viewer, @NotNull Data.Items.Items items, @NotNull User holder) {
final Optional<DataSnapshot.Packed> latestData = plugin.getDatabase().getLatestSnapshot(user); final Optional<DataSnapshot.Packed> latestData = plugin.getDatabase().getLatestSnapshot(holder);
if (latestData.isEmpty()) { if (latestData.isEmpty()) {
plugin.getLocales().getLocale("error_no_data_to_display") plugin.getLocales().getLocale("error_no_data_to_display")
.ifPresent(viewer::sendMessage); .ifPresent(viewer::sendMessage);
@ -90,10 +90,12 @@ public class InventoryCommand extends ItemsCommand {
); );
}); });
// Save data
final RedisManager redis = plugin.getRedisManager(); final RedisManager redis = plugin.getRedisManager();
plugin.getDatabase().addSnapshot(user, snapshot); plugin.getDataSyncer().saveData(holder, snapshot, (user, data) -> {
redis.sendUserDataUpdate(user, snapshot); redis.getUserData(user).ifPresent(d -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR));
redis.getUserData(user).ifPresent(data -> redis.setUserData(user, snapshot, RedisKeyType.TTL_1_YEAR)); redis.sendUserDataUpdate(user, data);
});
} }
} }

@ -21,6 +21,8 @@ package net.william278.husksync.command;
import net.william278.husksync.HuskSync; import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot; import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.redis.RedisKeyType;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.CommandUser; import net.william278.husksync.user.CommandUser;
import net.william278.husksync.user.User; import net.william278.husksync.user.User;
import net.william278.husksync.util.DataDumper; import net.william278.husksync.util.DataDumper;
@ -152,11 +154,14 @@ public class UserDataCommand extends Command implements TabProvider {
); );
})); }));
// Set the user's data and send a message // Save data
plugin.getDatabase().addSnapshot(user, data); final RedisManager redis = plugin.getRedisManager();
plugin.getRedisManager().sendUserDataUpdate(user, data); plugin.getDataSyncer().saveData(user, data, (u, s) -> {
plugin.getLocales().getLocale("data_restored", user.getUsername(), user.getUuid().toString(), redis.getUserData(u).ifPresent(d -> redis.setUserData(u, s, RedisKeyType.TTL_1_YEAR));
data.getShortId(), data.getId().toString()).ifPresent(executor::sendMessage); redis.sendUserDataUpdate(u, s);
plugin.getLocales().getLocale("data_restored", u.getUsername(), u.getUuid().toString(),
s.getShortId(), s.getId().toString()).ifPresent(executor::sendMessage);
});
} }
case "pin" -> { case "pin" -> {

@ -23,8 +23,6 @@ import lombok.Getter;
import net.william278.husksync.HuskSync; import net.william278.husksync.HuskSync;
import net.william278.husksync.config.Settings; import net.william278.husksync.config.Settings;
import net.william278.husksync.data.DataSnapshot; import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.data.DataSnapshot.SaveCause;
import net.william278.husksync.data.UserDataHolder;
import net.william278.husksync.user.User; import net.william278.husksync.user.User;
import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -33,6 +31,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.*; import java.util.*;
import java.util.function.BiConsumer;
/** /**
* An abstract representation of the plugin database, storing player data. * An abstract representation of the plugin database, storing player data.
@ -156,42 +155,23 @@ public abstract class Database {
@Blocking @Blocking
public abstract boolean deleteSnapshot(@NotNull User user, @NotNull UUID versionUuid); public abstract boolean deleteSnapshot(@NotNull User user, @NotNull UUID versionUuid);
/**
* Save user data to the database
* </p>
* This will remove the oldest data for the user if the amount of data exceeds the limit as configured
*
* @param user The user to add data for
* @param snapshot The {@link DataSnapshot} to set.
* The implementation should version it with a random UUID and the current timestamp during insertion.
* @see UserDataHolder#createSnapshot(SaveCause)
*/
@Blocking
public void addSnapshot(@NotNull User user, @NotNull DataSnapshot.Packed snapshot) {
if (snapshot.getSaveCause() != SaveCause.SERVER_SHUTDOWN) {
plugin.fireEvent(
plugin.getDataSaveEvent(user, snapshot),
(event) -> this.addAndRotateSnapshot(user, snapshot)
);
return;
}
this.addAndRotateSnapshot(user, snapshot);
}
/** /**
* <b>Internal</b> - Save user data to the database. This will: * Save user data to the database, doing the following (in order):
* <ol> * <ol>
* <li>Delete their most recent snapshot, if it was created before the backup frequency time</li> * <li>Delete their most recent snapshot, if it was created before the backup frequency time</li>
* <li>Create the snapshot</li> * <li>Create the snapshot</li>
* <li>Rotate snapshot backups</li> * <li>Rotate snapshot backups</li>
* </ol> * </ol>
* This is an expensive blocking method and should be run off the main thread.
* *
* @param user The user to add data for * @param user The user to add data for
* @param snapshot The {@link DataSnapshot} to set. * @param snapshot The {@link DataSnapshot} to set.
* @apiNote Prefer {@link net.william278.husksync.sync.DataSyncer#saveData(User, DataSnapshot.Packed, BiConsumer)}.
* </p>This method will not fire the {@link net.william278.husksync.event.DataSaveEvent}
*/ */
@Blocking @Blocking
private void addAndRotateSnapshot(@NotNull User user, @NotNull DataSnapshot.Packed snapshot) { public void addSnapshot(@NotNull User user, @NotNull DataSnapshot.Packed snapshot) {
final int backupFrequency = plugin.getSettings().getSynchronization().getSnapshotBackupFrequency(); final int backupFrequency = plugin.getSettings().getSynchronization().getSnapshotBackupFrequency();
if (!snapshot.isPinned() && backupFrequency > 0) { if (!snapshot.isPinned() && backupFrequency > 0) {
this.rotateLatestSnapshot(user, snapshot.getTimestamp().minusHours(backupFrequency)); this.rotateLatestSnapshot(user, snapshot.getTimestamp().minusHours(backupFrequency));

@ -81,8 +81,8 @@ public abstract class EventListener {
} }
usersInWorld.stream() usersInWorld.stream()
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc()) .filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> plugin.getDatabase().addSnapshot( .forEach(user -> plugin.getDataSyncer().saveData(
user, user.createSnapshot(DataSnapshot.SaveCause.WORLD_SAVE) user, user.createSnapshot(DataSnapshot.SaveCause.WORLD_SAVE), null
)); ));
} }
@ -101,7 +101,7 @@ public abstract class EventListener {
final DataSnapshot.Packed snapshot = user.createSnapshot(DataSnapshot.SaveCause.DEATH); final DataSnapshot.Packed snapshot = user.createSnapshot(DataSnapshot.SaveCause.DEATH);
snapshot.edit(plugin, (data -> data.getInventory().ifPresent(inventory -> inventory.setContents(items)))); snapshot.edit(plugin, (data -> data.getInventory().ifPresent(inventory -> inventory.setContents(items))));
plugin.getDatabase().addSnapshot(user, snapshot); plugin.getDataSyncer().saveData(user, snapshot, (u, d) -> plugin.getRedisManager().sendUserDataUpdate(u, d));
} }
/** /**
@ -123,7 +123,9 @@ public abstract class EventListener {
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc()) .filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> { .forEach(user -> {
plugin.lockPlayer(user.getUuid()); plugin.lockPlayer(user.getUuid());
plugin.getDatabase().addSnapshot(user, user.createSnapshot(DataSnapshot.SaveCause.SERVER_SHUTDOWN)); plugin.getDataSyncer().saveData(
user, user.createSnapshot(DataSnapshot.SaveCause.SERVER_SHUTDOWN), null
);
}); });
// Close outstanding connections // Close outstanding connections

@ -22,14 +22,20 @@ package net.william278.husksync.sync;
import net.william278.husksync.HuskSync; import net.william278.husksync.HuskSync;
import net.william278.husksync.api.HuskSyncAPI; import net.william278.husksync.api.HuskSyncAPI;
import net.william278.husksync.data.DataSnapshot; import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.database.Database;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.user.OnlineUser; import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User;
import net.william278.husksync.util.Task; import net.william278.husksync.util.Task;
import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -87,6 +93,42 @@ public abstract class DataSyncer {
*/ */
public abstract void saveUserData(@NotNull OnlineUser user); public abstract void saveUserData(@NotNull OnlineUser user);
/**
* Save a {@link DataSnapshot.Packed user's data snapshot} to the database,
* first firing the {@link net.william278.husksync.event.DataSaveEvent}. This will not update data on Redis.
*
* @param user the user to save the data for
* @param data the data to save
* @param after a consumer to run after data has been saved. Will be run async (off the main thread).
* @apiNote Data will not be saved if the {@link net.william278.husksync.event.DataSaveEvent} is cancelled.
* Note that this method can also edit the data before saving it.
* @implNote Note that the {@link net.william278.husksync.event.DataSaveEvent} will <b>not</b> be fired if the
* save cause is {@link DataSnapshot.SaveCause#SERVER_SHUTDOWN}.
* @since 3.3.2
*/
@Blocking
public void saveData(@NotNull User user, @NotNull DataSnapshot.Packed data,
@Nullable BiConsumer<User, DataSnapshot.Packed> after) {
if (data.getSaveCause() == DataSnapshot.SaveCause.SERVER_SHUTDOWN) {
addSnapshotToDatabase(user, data, after);
return;
}
plugin.fireEvent(
plugin.getDataSaveEvent(user, data),
(event) -> addSnapshotToDatabase(user, data, after)
);
}
// Adds a snapshot to the database and runs the after consumer
@Blocking
private void addSnapshotToDatabase(@NotNull User user, @NotNull DataSnapshot.Packed data,
@Nullable BiConsumer<User, DataSnapshot.Packed> after) {
getDatabase().addSnapshot(user, data);
if (after != null) {
after.accept(user, data);
}
}
// Calculates the max attempts the system should listen for user data for based on the latency value // Calculates the max attempts the system should listen for user data for based on the latency value
private long getMaxListenAttempts() { private long getMaxListenAttempts() {
return BASE_LISTEN_ATTEMPTS + ( return BASE_LISTEN_ATTEMPTS + (
@ -98,7 +140,7 @@ public abstract class DataSyncer {
// Set a user's data from the database, or set them as a new user // Set a user's data from the database, or set them as a new user
@ApiStatus.Internal @ApiStatus.Internal
protected void setUserFromDatabase(@NotNull OnlineUser user) { protected void setUserFromDatabase(@NotNull OnlineUser user) {
plugin.getDatabase().getLatestSnapshot(user).ifPresentOrElse( getDatabase().getLatestSnapshot(user).ifPresentOrElse(
snapshot -> user.applySnapshot(snapshot, DataSnapshot.UpdateCause.SYNCHRONIZED), snapshot -> user.applySnapshot(snapshot, DataSnapshot.UpdateCause.SYNCHRONIZED),
() -> user.completeSync(true, DataSnapshot.UpdateCause.NEW_USER, plugin) () -> user.completeSync(true, DataSnapshot.UpdateCause.NEW_USER, plugin)
); );
@ -139,6 +181,16 @@ public abstract class DataSyncer {
task.get().run(); task.get().run();
} }
@NotNull
protected RedisManager getRedis() {
return plugin.getRedisManager();
}
@NotNull
protected Database getDatabase() {
return plugin.getDatabase();
}
/** /**
* Represents the different available default modes of {@link DataSyncer} * Represents the different available default modes of {@link DataSyncer}
* *

@ -39,7 +39,7 @@ public class DelayDataSyncer extends DataSyncer {
plugin.runAsyncDelayed( plugin.runAsyncDelayed(
() -> { () -> {
// Fetch from the database if the user isn't changing servers // Fetch from the database if the user isn't changing servers
if (!plugin.getRedisManager().getUserServerSwitch(user)) { if (!getRedis().getUserServerSwitch(user)) {
this.setUserFromDatabase(user); this.setUserFromDatabase(user);
return; return;
} }
@ -47,7 +47,7 @@ public class DelayDataSyncer extends DataSyncer {
// Listen for the data to be updated // Listen for the data to be updated
this.listenForRedisData( this.listenForRedisData(
user, user,
() -> plugin.getRedisManager().getUserData(user).map(data -> { () -> getRedis().getUserData(user).map(data -> {
user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED); user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED);
return true; return true;
}).orElse(false) }).orElse(false)
@ -58,12 +58,13 @@ public class DelayDataSyncer extends DataSyncer {
} }
@Override @Override
public void saveUserData(@NotNull OnlineUser user) { public void saveUserData(@NotNull OnlineUser onlineUser) {
plugin.runAsync(() -> { plugin.runAsync(() -> {
plugin.getRedisManager().setUserServerSwitch(user); getRedis().setUserServerSwitch(onlineUser);
final DataSnapshot.Packed data = user.createSnapshot(DataSnapshot.SaveCause.DISCONNECT); saveData(
plugin.getRedisManager().setUserData(user, data, RedisKeyType.TTL_10_SECONDS); onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT),
plugin.getDatabase().addSnapshot(user, data); (user, data) -> getRedis().setUserData(user, data, RedisKeyType.TTL_10_SECONDS)
);
}); });
} }

@ -33,21 +33,21 @@ public class LockstepDataSyncer extends DataSyncer {
@Override @Override
public void initialize() { public void initialize() {
plugin.getRedisManager().clearUsersCheckedOutOnServer(); getRedis().clearUsersCheckedOutOnServer();
} }
@Override @Override
public void terminate() { public void terminate() {
plugin.getRedisManager().clearUsersCheckedOutOnServer(); getRedis().clearUsersCheckedOutOnServer();
} }
// Consume their data when they are checked in // Consume their data when they are checked in
@Override @Override
public void setUserData(@NotNull OnlineUser user) { public void setUserData(@NotNull OnlineUser user) {
this.listenForRedisData(user, () -> { this.listenForRedisData(user, () -> {
if (plugin.getRedisManager().getUserCheckedOut(user).isEmpty()) { if (getRedis().getUserCheckedOut(user).isEmpty()) {
plugin.getRedisManager().setUserCheckedOut(user, true); getRedis().setUserCheckedOut(user, true);
plugin.getRedisManager().getUserData(user).ifPresentOrElse( getRedis().getUserData(user).ifPresentOrElse(
data -> user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED), data -> user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED),
() -> this.setUserFromDatabase(user) () -> this.setUserFromDatabase(user)
); );
@ -58,12 +58,16 @@ public class LockstepDataSyncer extends DataSyncer {
} }
@Override @Override
public void saveUserData(@NotNull OnlineUser user) { public void saveUserData(@NotNull OnlineUser onlineUser) {
plugin.runAsync(() -> { plugin.runAsync(() -> {
final DataSnapshot.Packed data = user.createSnapshot(DataSnapshot.SaveCause.DISCONNECT); getRedis().setUserServerSwitch(onlineUser);
plugin.getRedisManager().setUserData(user, data, RedisKeyType.TTL_1_YEAR); saveData(
plugin.getRedisManager().setUserCheckedOut(user, false); onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT),
plugin.getDatabase().addSnapshot(user, data); (user, data) -> {
getRedis().setUserData(user, data, RedisKeyType.TTL_1_YEAR);
getRedis().setUserCheckedOut(user, false);
}
);
}); });
} }

Loading…
Cancel
Save