diff --git a/src/main/java/com/alttd/datalock/EventListener.java b/src/main/java/com/alttd/datalock/EventListener.java index 5688252..c428e3b 100644 --- a/src/main/java/com/alttd/datalock/EventListener.java +++ b/src/main/java/com/alttd/datalock/EventListener.java @@ -17,6 +17,9 @@ import java.util.stream.Collectors; public class EventListener { + private EventListener() {} + +// private final Idempotency idempotencyStorage = new Idempotency(); private final HashMap> queuedLocks = new HashMap<>(); private final HashMap> channelLockMap = new HashMap<>(); private final List channelIdentifierList = new ArrayList<>(); @@ -61,7 +64,7 @@ public class EventListener { } for (Lock lock : temp) { locks.remove(lock); - queueNextLock(locks, lock, key); + queueNextLock(locks, lock, key, null); if (Config.DEBUG) Logger.info("Clearing % from % due to clear server being called for the server that lock is on", lock.getData(), key.getId()); } @@ -132,13 +135,34 @@ public class EventListener { if (!isValid(channel.toLowerCase(), data)) return; + UUID idempotency; + try { + idempotency = UUID.fromString(in.readUTF()); + } catch (Exception e) { + Logger.error("No idempotency key found.", + identifier.getId()); + idempotency = null; //TODO change this to return + } + + if (Config.DEBUG) Logger.info("Plugin message channel: [%]", channel.toLowerCase()); - switch (channel.toLowerCase()) { - case "try-lock" -> tryLock(identifier, hashLock, data, serverConnection); - case "check-lock" -> checkLock(identifier, hashLock, data, serverConnection); - case "try-unlock" -> tryUnlock(identifier, hashLock, data, serverConnection); + Optional first = Arrays.stream(RequestType.values()).filter(value -> value.subChannel.equalsIgnoreCase(channel)).findFirst(); + if (first.isEmpty()) { + Logger.warn("Received invalid request type [%]", channel.toLowerCase()); + return; + } +// RequestType requestType = first.get(); +// if (!idempotencyStorage.putIdempotencyData(requestType, new IdempotencyData(requestType, data, idempotency))) +// return; +// switch (requestType) { +// +// } + switch (channel.toLowerCase()) { //TODO something with idempotency for function + case "try-lock" -> tryLock(identifier, hashLock, data, idempotency, serverConnection); + case "check-lock" -> checkLock(identifier, hashLock, data, idempotency, serverConnection); + case "try-unlock" -> tryUnlock(identifier, hashLock, data, idempotency, serverConnection); } } @@ -159,16 +183,21 @@ public class EventListener { } } - private void tryLock(ChannelIdentifier identifier, HashSet lockSet, String data, ServerConnection serverConnection) { + private void sendPluginMessage(String channel, boolean result, String data, UUID idempotency, ServerConnection serverConnection, ChannelIdentifier identifier) { ByteArrayDataOutput out = ByteStreams.newDataOutput(); - out.writeUTF("try-lock-result"); + out.writeUTF(channel); + out.writeBoolean(result); + out.writeUTF(data); + out.writeUTF(idempotency.toString()); + serverConnection.sendPluginMessage(identifier, out.toByteArray()); + } + private void tryLock(ChannelIdentifier identifier, HashSet lockSet, String data, UUID idempotency, ServerConnection serverConnection) { + String channel = "try-lock-result"; Lock lock = new Lock(serverConnection.getServerInfo().hashCode(), data); if (lockSet.contains(lock)) { //An entry from this server already exists, so we can say that it's locked - out.writeBoolean(true); - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier); return; } @@ -184,10 +213,8 @@ public class EventListener { } else { //An entry from another server exists, so we can't lock it - out.writeBoolean(false); - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); - queueLock(queuedLocks.getOrDefault(identifier, new HashSet<>()), identifier, lock, serverConnection); + sendPluginMessage(channel, false, lock.getData(), idempotency, serverConnection, identifier); + queueLock(queuedLocks.getOrDefault(identifier, new HashSet<>()), identifier, lock, serverConnection, idempotency); return; } } @@ -195,42 +222,35 @@ public class EventListener { //Lock the data lockSet.add(lock); channelLockMap.put(identifier, lockSet); - - out.writeBoolean(true); - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier); } - private void checkLock(ChannelIdentifier identifier, HashSet lockSet, String data, ServerConnection serverConnection) { - ByteArrayDataOutput out = ByteStreams.newDataOutput(); + private void checkLock(ChannelIdentifier identifier, HashSet lockSet, String data, UUID idempotency, ServerConnection serverConnection) { Lock lock = new Lock(serverConnection.hashCode(), data); + String channel = "check-lock-result"; + boolean result; - out.writeUTF("check-lock-result"); if (lockSet.contains(lock)) //We locked this, but we still return true since it's locked - out.writeBoolean(true); + result = true; else if (lockSet.stream().anyMatch(a -> a.compareTo(lock) == 0)) - out.writeBoolean(true); //There is a lock (not ours, but it's still locked) + result = true; //There is a lock (not ours, but it's still locked) else - out.writeBoolean(false); //The data is not locked + result = false; //The data is not locked - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, result, lock.getData(), idempotency, serverConnection, identifier); } - private void tryUnlock(ChannelIdentifier identifier, HashSet lockSet, String data, ServerConnection serverConnection) { + private void tryUnlock(ChannelIdentifier identifier, HashSet lockSet, String data, UUID idempotency, ServerConnection serverConnection) { int hash = serverConnection.getServerInfo().hashCode(); - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - out.writeUTF("try-unlock-result"); + String channel = "try-unlock-result"; Lock lock = new Lock(hash, data); if (lockSet.contains(lock)) //Lock is in the list, but it's made by this server, so we can unlock it { - out.writeBoolean(true); - out.writeUTF(lock.getData()); lockSet.remove(lock); - queueNextLock(lockSet, lock, identifier); + queueNextLock(lockSet, lock, identifier, idempotency); channelLockMap.put(identifier, lockSet); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier); return; } @@ -238,17 +258,13 @@ public class EventListener { if (first.isEmpty()) //There is no entry with this data, so we can say it's unlocked { removeQueuedLock(queuedLocks.get(identifier), lock, hash); - out.writeBoolean(true); - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); - queueNextLock(lockSet, lock, identifier); + sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier); + queueNextLock(lockSet, lock, identifier, idempotency); return; } //There is an entry with this data, but it's not owned by this server, so we can't unlock it - out.writeBoolean(false); - out.writeUTF(lock.getData()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, false, lock.getData(), idempotency, serverConnection, identifier); } private void removeQueuedLock(HashSet locks, Lock exampleLock, int hash) { @@ -262,7 +278,8 @@ public class EventListener { locks.remove(lock); } - private void queueLock(HashSet lockSet, ChannelIdentifier identifier, Lock lock, ServerConnection serverConnection) { + private void queueLock(HashSet lockSet, ChannelIdentifier identifier, Lock lock, ServerConnection serverConnection, UUID idempotency) { + String channel = "queue-lock-failed"; if (lockSet.contains(lock)) { //Lock already queued we don't have to queue it again return; @@ -275,12 +292,8 @@ public class EventListener { .findAny(); if (optionalRegisteredServer.isPresent()) { //The server that queued this lock is still active, so we can't queue a new one - RegisteredServer registeredServer = optionalRegisteredServer.get(); - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - out.writeUTF("queue-lock-failed"); - out.writeUTF(queuedLock.getData()); - out.writeUTF(registeredServer.getServerInfo().getName()); - serverConnection.sendPluginMessage(identifier, out.toByteArray()); +// RegisteredServer registeredServer = optionalRegisteredServer.get(); todo this was once used in the plugin message, check if it was needed + sendPluginMessage(channel, false, queuedLock.getData(), idempotency, serverConnection, identifier); return; } Logger.warn("Removing queued lock [%] due to being unable to find a server where that lock could be active", queuedLock.getData()); @@ -290,7 +303,8 @@ public class EventListener { queuedLocks.put(identifier, lockSet); } - private void queueNextLock(HashSet lockSet, Lock lock, ChannelIdentifier identifier) { + private void queueNextLock(HashSet lockSet, Lock lock, ChannelIdentifier identifier, UUID idempotency) { + String channel = "locked-queued-lock"; if (!queuedLocks.containsKey(identifier)) return; HashSet queuedLockSet = queuedLocks.get(identifier); @@ -306,15 +320,12 @@ public class EventListener { .findAny(); if (optionalRegisteredServer.isEmpty()) { Logger.warn("Removing queued lock [%] due to being unable to find a server where that lock could be active", queuedLock.getData()); - queueNextLock(lockSet, lock, identifier); + queueNextLock(lockSet, lock, identifier, idempotency); return; } RegisteredServer registeredServer = optionalRegisteredServer.get(); lockSet.add(queuedLock); - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - out.writeUTF("locked-queued-lock"); - out.writeUTF(queuedLock.getData()); - registeredServer.sendPluginMessage(identifier, out.toByteArray()); + sendPluginMessage(channel, true, queuedLock.getData(), idempotency, (ServerConnection) registeredServer, identifier); //TODO test if this cast works } } diff --git a/src/main/java/com/alttd/datalock/Idempotency.java b/src/main/java/com/alttd/datalock/Idempotency.java new file mode 100644 index 0000000..de5ffcb --- /dev/null +++ b/src/main/java/com/alttd/datalock/Idempotency.java @@ -0,0 +1,51 @@ +package com.alttd.datalock; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +class Idempotency { + + private final HashMap> idempotencyMap; + + protected Idempotency() { + idempotencyMap = new HashMap<>(); + } + + private HashSet getIdempotencySet(RequestType requestType) { + return idempotencyMap.getOrDefault(requestType, new HashSet<>()); + } + + private void putIdempotencySet(RequestType requestType, HashSet idempotencySet) { + idempotencyMap.put(requestType, idempotencySet); + } + + /** + * Add IdempotencyData to the list of active queries + * @param idempotencyData Data to add to the set + * @return true if entry did not exist yet + */ + protected synchronized boolean putIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) { + HashSet idempotencySet = getIdempotencySet(requestType); + boolean result = idempotencySet.add(idempotencyData); + putIdempotencySet(requestType, idempotencySet); + return result; + } + + /** + * Remove IdempotencyData from the list of active queries + * @param idempotencyData Data to remove from the set + * @return True if the data that was requested to be removed was in the set and was removed + */ + protected synchronized boolean removeIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) { + HashSet idempotencySet = getIdempotencySet(requestType); + boolean result = idempotencySet.remove(idempotencyData); + putIdempotencySet(requestType, idempotencySet); + return result; + } + + protected synchronized Set getIdempotencyData(RequestType requestType) { + return Collections.unmodifiableSet(getIdempotencySet(requestType)); + } +} diff --git a/src/main/java/com/alttd/datalock/IdempotencyData.java b/src/main/java/com/alttd/datalock/IdempotencyData.java new file mode 100644 index 0000000..2a5f0ab --- /dev/null +++ b/src/main/java/com/alttd/datalock/IdempotencyData.java @@ -0,0 +1,10 @@ +package com.alttd.datalock; + +import java.util.UUID; + +record IdempotencyData(RequestType channel, String data, UUID idempotencyToken) { + @Override + public String toString() { + return "Channel: [" + channel + "] Data: [" + data + "] Idempotency Token: [" + idempotencyToken + "]"; + } +} \ No newline at end of file diff --git a/src/main/java/com/alttd/datalock/RequestType.java b/src/main/java/com/alttd/datalock/RequestType.java new file mode 100644 index 0000000..92cd66a --- /dev/null +++ b/src/main/java/com/alttd/datalock/RequestType.java @@ -0,0 +1,13 @@ +package com.alttd.datalock; + +enum RequestType { + TRY_LOCK("try-lock"), + TRY_UNLOCK("try-unlock"), + CHECK_LOCK("check-lock"); + + String subChannel; + + RequestType(String subChannel) { + this.subChannel = subChannel; + } +}