diff --git a/src/main/java/DataLock.java b/src/main/java/DataLock.java index 04f4afb..86ae113 100644 --- a/src/main/java/DataLock.java +++ b/src/main/java/DataLock.java @@ -1,4 +1,7 @@ -import java.util.HashMap; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.bukkit.scheduler.BukkitRunnable; + import java.util.HashSet; import java.util.UUID; @@ -12,36 +15,64 @@ public class DataLock implements DataLockInterface { return instance; } - private DataLock() {} - - private final HashMap activeQueries = new HashMap<>(); - - protected void putQuery(IdempotencyData idempotencyData) { - activeQueries.put(idempotencyData.idempotencyToken(), idempotencyData); + private final PluginMessageListener pluginMessageListener; + private final DataLockLib plugin; + private final Idempotency activeRequests; + private DataLock() { + pluginMessageListener = new PluginMessageListener(); + plugin = DataLockLib.getInstance(); + activeRequests = new Idempotency(); + new RepeatRequest().runTaskTimerAsynchronously(plugin, 15 * 20, 15 * 20); + //Run repeat request task every 15 seconds } - protected IdempotencyData getQuery(UUID idempotencyToken) { - return activeQueries.getOrDefault(idempotencyToken, null); - } - - protected IdempotencyData removeQuery(UUID idempotencyToken) { - return activeQueries.remove(idempotencyToken); + private void sendPluginMessage(RequestType requestType, IdempotencyData idempotencyData) { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + out.writeUTF(requestType.subChannel); + out.writeUTF(idempotencyData.data()); + plugin.getServer().sendPluginMessage(plugin, idempotencyData.channel(), out.toByteArray()); } private final HashSet activeChannels = new HashSet<>(); - public void registerChannel(String channel) { + public boolean removeActiveRequest(RequestType requestType, IdempotencyData idempotencyData) { + return activeRequests.removeIdempotencyData(requestType, idempotencyData); + } + @Override + public synchronized void registerChannel(String channel) { + activeChannels.add(channel); + plugin.getServer().getMessenger().registerOutgoingPluginChannel(plugin, channel); + plugin.getServer().getMessenger().registerIncomingPluginChannel(plugin, channel, pluginMessageListener); + } + + @Override + public synchronized boolean isActiveChannel(String channel) { + return activeChannels.contains(channel); } @Override public void tryLock(String channel, String data) { IdempotencyData idempotencyData = new IdempotencyData(channel, data, UUID.randomUUID()); - + activeRequests.putIdempotencyData(RequestType.TRY_LOCK, idempotencyData); + sendPluginMessage(RequestType.TRY_LOCK, idempotencyData); } @Override public void tryUnlock(String channel, String data) { + IdempotencyData idempotencyData = new IdempotencyData(channel, data, UUID.randomUUID()); + activeRequests.putIdempotencyData(RequestType.TRY_UNLOCK, idempotencyData); + sendPluginMessage(RequestType.TRY_UNLOCK, idempotencyData); + } + private class RepeatRequest extends BukkitRunnable { + @Override + public void run() { + for (RequestType requestType : RequestType.values()) { + for (IdempotencyData next : activeRequests.getIdempotencyData(requestType)) { + sendPluginMessage(requestType, next); + } + } + } } } diff --git a/src/main/java/DataLockInterface.java b/src/main/java/DataLockInterface.java index 81c2f21..46473cf 100644 --- a/src/main/java/DataLockInterface.java +++ b/src/main/java/DataLockInterface.java @@ -4,4 +4,8 @@ public interface DataLockInterface { void tryUnlock(String channel, String data); + void registerChannel(String channel); + + boolean isActiveChannel(String channel); + } diff --git a/src/main/java/DataLockLib.java b/src/main/java/DataLockLib.java index b7e2e4b..aac417f 100644 --- a/src/main/java/DataLockLib.java +++ b/src/main/java/DataLockLib.java @@ -15,7 +15,6 @@ public class DataLockLib extends JavaPlugin { @Override public void onEnable() { - registerEvents(); } @Override @@ -23,10 +22,4 @@ public class DataLockLib extends JavaPlugin { } - private void registerEvents() { -// getServer().getPluginManager().registerEvents(new TalkToQuest(), this); -// getServer().getMessenger().registerOutgoingPluginChannel(this, "aquest:player-data"); -// getServer().getMessenger().registerIncomingPluginChannel(this, "aquest:player-data", new PluginMessageListener()); - } - } diff --git a/src/main/java/DataLockListener.java b/src/main/java/DataLockListener.java deleted file mode 100644 index 5054711..0000000 --- a/src/main/java/DataLockListener.java +++ /dev/null @@ -1,5 +0,0 @@ -import org.bukkit.event.Listener; - -public class DataLockListener implements Listener { - -} diff --git a/src/main/java/Idempotency.java b/src/main/java/Idempotency.java new file mode 100644 index 0000000..0c42448 --- /dev/null +++ b/src/main/java/Idempotency.java @@ -0,0 +1,49 @@ +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +public class Idempotency { + + private final HashMap> idempotencyMap; + + public Idempotency() { + idempotencyMap = new HashMap<>(); + } + + private HashSet getIdempotencySet(RequestType requestType) { + return idempotencyMap.getOrDefault(requestType, new HashSet<>()); + } + + private void putIdempotencySet(RequestType requestType, HashSet idempotencySet) { + idempotencyMap.put(requestType, idempotencySet); + } + + /** + * Add IdempotencyData to the list of active queries + * @param idempotencyData Data to add to the set + * @return true if entry did not exist yet + */ + public synchronized boolean putIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) { + HashSet idempotencySet = getIdempotencySet(requestType); + boolean result = idempotencySet.add(idempotencyData); + putIdempotencySet(requestType, idempotencySet); + return result; + } + + /** + * Remove IdempotencyData from the list of active queries + * @param idempotencyData Data to remove from the set + * @return True if the data that was requested to be removed was in the set and was removed + */ + public synchronized boolean removeIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) { + HashSet idempotencySet = getIdempotencySet(requestType); + boolean result = idempotencySet.remove(idempotencyData); + putIdempotencySet(requestType, idempotencySet); + return result; + } + + public synchronized Set getIdempotencyData(RequestType requestType) { + return Collections.unmodifiableSet(getIdempotencySet(requestType)); + } +} diff --git a/src/main/java/LockResponseEvent.java b/src/main/java/LockResponseEvent.java index 39237c0..d5034d2 100644 --- a/src/main/java/LockResponseEvent.java +++ b/src/main/java/LockResponseEvent.java @@ -6,12 +6,14 @@ public class LockResponseEvent extends Event { private final HandlerList handlers = new HandlerList(); private final String channel; + private final ResponseType responseType; private final String data; private final boolean result; protected LockResponseEvent(boolean isAsync, String channel, ResponseType responseType, String data, boolean result) { super(isAsync); this.channel = channel; + this.responseType = responseType; this.data = data; this.result = result; } @@ -24,8 +26,6 @@ public class LockResponseEvent extends Event { return data; } - - public @NotNull HandlerList getHandlers() { return handlers; } diff --git a/src/main/java/PluginMessageListener.java b/src/main/java/PluginMessageListener.java index 21bc180..d86f091 100644 --- a/src/main/java/PluginMessageListener.java +++ b/src/main/java/PluginMessageListener.java @@ -4,29 +4,61 @@ import org.bukkit.entity.Player; import org.bukkit.scheduler.BukkitRunnable; import org.jetbrains.annotations.NotNull; -import java.util.HashSet; import java.util.UUID; public class PluginMessageListener implements org.bukkit.plugin.messaging.PluginMessageListener { + private final DataLock dataLock; + private final Idempotency alreadyReceived; + PluginMessageListener() { + this.dataLock = DataLock.getInstance(); + this.alreadyReceived = new Idempotency(); + } + @Override public void onPluginMessageReceived(@NotNull String channel, @NotNull Player player, byte[] bytes) { - if (!activeChannels.contains(channel)) { + if (!dataLock.isActiveChannel(channel)) { return; } ByteArrayDataInput in = ByteStreams.newDataInput(bytes); String data = in.readUTF(); boolean result = in.readBoolean(); UUID idempotency = UUID.fromString(in.readUTF()); + IdempotencyData idempotencyData = new IdempotencyData(channel, data, idempotency); new BukkitRunnable() { @Override public void run() { switch (in.readUTF()) { - case "try-lock-result" -> new LockResponseEvent(true, channel, ResponseType.TRY_LOCK_RESULT, data, result); - case "queue-lock-failed" -> new LockResponseEvent(true, channel, ResponseType.QUEUE_LOCK_FAILED, data, result); - case "try-unlock-result" -> new LockResponseEvent(true, channel, ResponseType.TRY_UNLOCK_RESULT, data, result); - case "locked-queue-lock" -> new LockResponseEvent(true, channel, ResponseType.LOCKED_QUEUE_LOCK, data, result); - case "check-lock-result" -> new LockResponseEvent(true, channel, ResponseType.CHECK_LOCK_RESULT, data, result); + case "try-lock-result" -> { + if (!alreadyReceived.putIdempotencyData(RequestType.TRY_LOCK, idempotencyData)) + return; + DataLock.getInstance().removeActiveRequest(RequestType.TRY_LOCK, idempotencyData); + new LockResponseEvent(true, channel, ResponseType.TRY_LOCK_RESULT, data, result); + } + case "queue-lock-failed" -> { + if (!alreadyReceived.putIdempotencyData(RequestType.TRY_LOCK, idempotencyData)) + return; + DataLock.getInstance().removeActiveRequest(RequestType.TRY_LOCK, idempotencyData); + new LockResponseEvent(true, channel, ResponseType.QUEUE_LOCK_FAILED, data, result); + } + case "try-unlock-result" -> { + if (!alreadyReceived.putIdempotencyData(RequestType.TRY_UNLOCK, idempotencyData)) + return; + DataLock.getInstance().removeActiveRequest(RequestType.TRY_UNLOCK, idempotencyData); + new LockResponseEvent(true, channel, ResponseType.TRY_UNLOCK_RESULT, data, result); + } + case "locked-queue-lock" -> { + if (!alreadyReceived.putIdempotencyData(RequestType.TRY_LOCK, idempotencyData)) + return; + DataLock.getInstance().removeActiveRequest(RequestType.TRY_LOCK, idempotencyData); + new LockResponseEvent(true, channel, ResponseType.LOCKED_QUEUE_LOCK, data, result); + } + case "check-lock-result" -> { + if (!alreadyReceived.putIdempotencyData(RequestType.CHECK_LOCK, idempotencyData)) + return; + DataLock.getInstance().removeActiveRequest(RequestType.CHECK_LOCK, idempotencyData); + new LockResponseEvent(true, channel, ResponseType.CHECK_LOCK_RESULT, data, result); + } } } }.runTaskAsynchronously(DataLockLib.getInstance()); diff --git a/src/main/java/RequestType.java b/src/main/java/RequestType.java new file mode 100644 index 0000000..a229202 --- /dev/null +++ b/src/main/java/RequestType.java @@ -0,0 +1,11 @@ +public enum RequestType { + TRY_LOCK("try-lock"), + TRY_UNLOCK("try-unlock"), + CHECK_LOCK("check-lock"); + + String subChannel; + + RequestType(String subChannel) { + this.subChannel = subChannel; + } +}