Added Idempotency

This commit is contained in:
Teriuihi 2022-12-05 21:36:51 +01:00
parent b0d84cf5e4
commit 643c515cbd
4 changed files with 138 additions and 53 deletions

View File

@ -17,6 +17,9 @@ import java.util.stream.Collectors;
public class EventListener {
private EventListener() {}
// private final Idempotency idempotencyStorage = new Idempotency();
private final HashMap<ChannelIdentifier, HashSet<Lock>> queuedLocks = new HashMap<>();
private final HashMap<ChannelIdentifier, HashSet<Lock>> channelLockMap = new HashMap<>();
private final List<ChannelIdentifier> channelIdentifierList = new ArrayList<>();
@ -61,7 +64,7 @@ public class EventListener {
}
for (Lock lock : temp) {
locks.remove(lock);
queueNextLock(locks, lock, key);
queueNextLock(locks, lock, key, null);
if (Config.DEBUG)
Logger.info("Clearing % from % due to clear server being called for the server that lock is on", lock.getData(), key.getId());
}
@ -132,13 +135,34 @@ public class EventListener {
if (!isValid(channel.toLowerCase(), data))
return;
UUID idempotency;
try {
idempotency = UUID.fromString(in.readUTF());
} catch (Exception e) {
Logger.error("No idempotency key found.",
identifier.getId());
idempotency = null; //TODO change this to return
}
if (Config.DEBUG)
Logger.info("Plugin message channel: [%]", channel.toLowerCase());
switch (channel.toLowerCase()) {
case "try-lock" -> tryLock(identifier, hashLock, data, serverConnection);
case "check-lock" -> checkLock(identifier, hashLock, data, serverConnection);
case "try-unlock" -> tryUnlock(identifier, hashLock, data, serverConnection);
Optional<RequestType> first = Arrays.stream(RequestType.values()).filter(value -> value.subChannel.equalsIgnoreCase(channel)).findFirst();
if (first.isEmpty()) {
Logger.warn("Received invalid request type [%]", channel.toLowerCase());
return;
}
// RequestType requestType = first.get();
// if (!idempotencyStorage.putIdempotencyData(requestType, new IdempotencyData(requestType, data, idempotency)))
// return;
// switch (requestType) {
//
// }
switch (channel.toLowerCase()) { //TODO something with idempotency for function
case "try-lock" -> tryLock(identifier, hashLock, data, idempotency, serverConnection);
case "check-lock" -> checkLock(identifier, hashLock, data, idempotency, serverConnection);
case "try-unlock" -> tryUnlock(identifier, hashLock, data, idempotency, serverConnection);
}
}
@ -159,16 +183,21 @@ public class EventListener {
}
}
private void tryLock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, ServerConnection serverConnection) {
private void sendPluginMessage(String channel, boolean result, String data, UUID idempotency, ServerConnection serverConnection, ChannelIdentifier identifier) {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeUTF("try-lock-result");
out.writeUTF(channel);
out.writeBoolean(result);
out.writeUTF(data);
out.writeUTF(idempotency.toString());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
}
private void tryLock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, UUID idempotency, ServerConnection serverConnection) {
String channel = "try-lock-result";
Lock lock = new Lock(serverConnection.getServerInfo().hashCode(), data);
if (lockSet.contains(lock)) {
//An entry from this server already exists, so we can say that it's locked
out.writeBoolean(true);
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier);
return;
}
@ -184,10 +213,8 @@ public class EventListener {
}
else {
//An entry from another server exists, so we can't lock it
out.writeBoolean(false);
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
queueLock(queuedLocks.getOrDefault(identifier, new HashSet<>()), identifier, lock, serverConnection);
sendPluginMessage(channel, false, lock.getData(), idempotency, serverConnection, identifier);
queueLock(queuedLocks.getOrDefault(identifier, new HashSet<>()), identifier, lock, serverConnection, idempotency);
return;
}
}
@ -195,42 +222,35 @@ public class EventListener {
//Lock the data
lockSet.add(lock);
channelLockMap.put(identifier, lockSet);
out.writeBoolean(true);
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier);
}
private void checkLock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, ServerConnection serverConnection) {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
private void checkLock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, UUID idempotency, ServerConnection serverConnection) {
Lock lock = new Lock(serverConnection.hashCode(), data);
String channel = "check-lock-result";
boolean result;
out.writeUTF("check-lock-result");
if (lockSet.contains(lock)) //We locked this, but we still return true since it's locked
out.writeBoolean(true);
result = true;
else if (lockSet.stream().anyMatch(a -> a.compareTo(lock) == 0))
out.writeBoolean(true); //There is a lock (not ours, but it's still locked)
result = true; //There is a lock (not ours, but it's still locked)
else
out.writeBoolean(false); //The data is not locked
result = false; //The data is not locked
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, result, lock.getData(), idempotency, serverConnection, identifier);
}
private void tryUnlock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, ServerConnection serverConnection) {
private void tryUnlock(ChannelIdentifier identifier, HashSet<Lock> lockSet, String data, UUID idempotency, ServerConnection serverConnection) {
int hash = serverConnection.getServerInfo().hashCode();
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeUTF("try-unlock-result");
String channel = "try-unlock-result";
Lock lock = new Lock(hash, data);
if (lockSet.contains(lock)) //Lock is in the list, but it's made by this server, so we can unlock it
{
out.writeBoolean(true);
out.writeUTF(lock.getData());
lockSet.remove(lock);
queueNextLock(lockSet, lock, identifier);
queueNextLock(lockSet, lock, identifier, idempotency);
channelLockMap.put(identifier, lockSet);
serverConnection.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier);
return;
}
@ -238,17 +258,13 @@ public class EventListener {
if (first.isEmpty()) //There is no entry with this data, so we can say it's unlocked
{
removeQueuedLock(queuedLocks.get(identifier), lock, hash);
out.writeBoolean(true);
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
queueNextLock(lockSet, lock, identifier);
sendPluginMessage(channel, true, lock.getData(), idempotency, serverConnection, identifier);
queueNextLock(lockSet, lock, identifier, idempotency);
return;
}
//There is an entry with this data, but it's not owned by this server, so we can't unlock it
out.writeBoolean(false);
out.writeUTF(lock.getData());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, false, lock.getData(), idempotency, serverConnection, identifier);
}
private void removeQueuedLock(HashSet<Lock> locks, Lock exampleLock, int hash) {
@ -262,7 +278,8 @@ public class EventListener {
locks.remove(lock);
}
private void queueLock(HashSet<Lock> lockSet, ChannelIdentifier identifier, Lock lock, ServerConnection serverConnection) {
private void queueLock(HashSet<Lock> lockSet, ChannelIdentifier identifier, Lock lock, ServerConnection serverConnection, UUID idempotency) {
String channel = "queue-lock-failed";
if (lockSet.contains(lock)) {
//Lock already queued we don't have to queue it again
return;
@ -275,12 +292,8 @@ public class EventListener {
.findAny();
if (optionalRegisteredServer.isPresent()) {
//The server that queued this lock is still active, so we can't queue a new one
RegisteredServer registeredServer = optionalRegisteredServer.get();
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeUTF("queue-lock-failed");
out.writeUTF(queuedLock.getData());
out.writeUTF(registeredServer.getServerInfo().getName());
serverConnection.sendPluginMessage(identifier, out.toByteArray());
// RegisteredServer registeredServer = optionalRegisteredServer.get(); todo this was once used in the plugin message, check if it was needed
sendPluginMessage(channel, false, queuedLock.getData(), idempotency, serverConnection, identifier);
return;
}
Logger.warn("Removing queued lock [%] due to being unable to find a server where that lock could be active", queuedLock.getData());
@ -290,7 +303,8 @@ public class EventListener {
queuedLocks.put(identifier, lockSet);
}
private void queueNextLock(HashSet<Lock> lockSet, Lock lock, ChannelIdentifier identifier) {
private void queueNextLock(HashSet<Lock> lockSet, Lock lock, ChannelIdentifier identifier, UUID idempotency) {
String channel = "locked-queued-lock";
if (!queuedLocks.containsKey(identifier))
return;
HashSet<Lock> queuedLockSet = queuedLocks.get(identifier);
@ -306,15 +320,12 @@ public class EventListener {
.findAny();
if (optionalRegisteredServer.isEmpty()) {
Logger.warn("Removing queued lock [%] due to being unable to find a server where that lock could be active", queuedLock.getData());
queueNextLock(lockSet, lock, identifier);
queueNextLock(lockSet, lock, identifier, idempotency);
return;
}
RegisteredServer registeredServer = optionalRegisteredServer.get();
lockSet.add(queuedLock);
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeUTF("locked-queued-lock");
out.writeUTF(queuedLock.getData());
registeredServer.sendPluginMessage(identifier, out.toByteArray());
sendPluginMessage(channel, true, queuedLock.getData(), idempotency, (ServerConnection) registeredServer, identifier); //TODO test if this cast works
}
}

View File

@ -0,0 +1,51 @@
package com.alttd.datalock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
class Idempotency {
private final HashMap<RequestType, HashSet<IdempotencyData>> idempotencyMap;
protected Idempotency() {
idempotencyMap = new HashMap<>();
}
private HashSet<IdempotencyData> getIdempotencySet(RequestType requestType) {
return idempotencyMap.getOrDefault(requestType, new HashSet<>());
}
private void putIdempotencySet(RequestType requestType, HashSet<IdempotencyData> idempotencySet) {
idempotencyMap.put(requestType, idempotencySet);
}
/**
* Add IdempotencyData to the list of active queries
* @param idempotencyData Data to add to the set
* @return true if entry did not exist yet
*/
protected synchronized boolean putIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) {
HashSet<IdempotencyData> idempotencySet = getIdempotencySet(requestType);
boolean result = idempotencySet.add(idempotencyData);
putIdempotencySet(requestType, idempotencySet);
return result;
}
/**
* Remove IdempotencyData from the list of active queries
* @param idempotencyData Data to remove from the set
* @return True if the data that was requested to be removed was in the set and was removed
*/
protected synchronized boolean removeIdempotencyData(RequestType requestType, IdempotencyData idempotencyData) {
HashSet<IdempotencyData> idempotencySet = getIdempotencySet(requestType);
boolean result = idempotencySet.remove(idempotencyData);
putIdempotencySet(requestType, idempotencySet);
return result;
}
protected synchronized Set<IdempotencyData> getIdempotencyData(RequestType requestType) {
return Collections.unmodifiableSet(getIdempotencySet(requestType));
}
}

View File

@ -0,0 +1,10 @@
package com.alttd.datalock;
import java.util.UUID;
record IdempotencyData(RequestType channel, String data, UUID idempotencyToken) {
@Override
public String toString() {
return "Channel: [" + channel + "] Data: [" + data + "] Idempotency Token: [" + idempotencyToken + "]";
}
}

View File

@ -0,0 +1,13 @@
package com.alttd.datalock;
enum RequestType {
TRY_LOCK("try-lock"),
TRY_UNLOCK("try-unlock"),
CHECK_LOCK("check-lock");
String subChannel;
RequestType(String subChannel) {
this.subChannel = subChannel;
}
}