From 1579430f76eee4d3abbac51c911e279632af9628 Mon Sep 17 00:00:00 2001 From: Michael Chen Date: Tue, 11 Jan 2022 20:32:25 +0100 Subject: [PATCH] ChunkWise message waiter Refined Storage basic implementation --- MinecraftDiscordBot/BotConfiguration.cs | 2 + MinecraftDiscordBot/ConnectedComputer.cs | 336 +++++++++++++++++++++++ MinecraftDiscordBot/Message.cs | 47 +++- MinecraftDiscordBot/Program.cs | 82 +++++- 4 files changed, 448 insertions(+), 19 deletions(-) create mode 100644 MinecraftDiscordBot/ConnectedComputer.cs diff --git a/MinecraftDiscordBot/BotConfiguration.cs b/MinecraftDiscordBot/BotConfiguration.cs index 7ee1705..1a3af65 100644 --- a/MinecraftDiscordBot/BotConfiguration.cs +++ b/MinecraftDiscordBot/BotConfiguration.cs @@ -9,4 +9,6 @@ public class BotConfiguration { public int Port { get; set; } = default!; [JsonProperty("channels", Required = Required.Always)] public IEnumerable Channels { get; set; } = default!; + [JsonProperty("prefix", Required = Required.Always)] + public string Prefix { get; set; } = default!; } diff --git a/MinecraftDiscordBot/ConnectedComputer.cs b/MinecraftDiscordBot/ConnectedComputer.cs new file mode 100644 index 0000000..7455e84 --- /dev/null +++ b/MinecraftDiscordBot/ConnectedComputer.cs @@ -0,0 +1,336 @@ +using Discord; +using Discord.WebSocket; +using Fleck; +using Newtonsoft.Json; +using System.Diagnostics; +using System.Text; + +namespace MinecraftDiscordBot; + +public class ConnectedComputer { + protected readonly IWebSocketConnection _socket; + public ConnectedComputer(IWebSocketConnection socket) { + socket.OnMessage = OnMessage; + _socket = socket; + } + + private void OnMessage(string message) { + var msg = JsonConvert.DeserializeObject(message); + if (msg is null) throw new InvalidProgramException("Unexpected Message!"); + IChunkWaiter? waiter; + lock (_syncRoot) { + if (!_waits.TryGetValue(msg.AnswerId, out waiter)) { + Console.WriteLine($"Invalid wait id '{msg.AnswerId}'!"); + return; + } + } + waiter.AddChunk(msg.Chunk, msg.Total, msg.Result); + if (waiter.Finished || waiter.IsCancellationRequested) + lock (_syncRoot) + _waits.Remove(waiter.ID); + } + public Task Send(string message) => _socket.Send(message); + protected Task Send(Message message) => Send(JsonConvert.SerializeObject(message)); + private readonly object _syncRoot = new(); + private readonly Dictionary _waits = new(); + private readonly Random _rnd = new(); + public IWebSocketConnectionInfo ConnectionInfo => _socket.ConnectionInfo; + + protected interface IChunkWaiter { + bool Finished { get; } + int ID { get; } + bool IsCancellationRequested { get; } + void AddChunk(int chunkId, int totalChunks, string value); + } + + protected class ChunkWaiter : IChunkWaiter { + public int ID { get; } + private readonly CancellationToken _ct; + public ChunkWaiter(int id, Func resultParser, CancellationToken ct) { + ID = id; + this.resultParser = resultParser; + _ct = ct; + } + private readonly TaskCompletionSource tcs = new(); + private readonly Func resultParser; + public Task Task => tcs.Task.WaitAsync(_ct); + public bool Finished { get; private set; } = false; + public bool IsCancellationRequested => _ct.IsCancellationRequested; + private string?[]? _chunks = null; + private int _receivedChunks = 0; + private readonly object _syncRoot = new(); + public void AddChunk(int chunkId, int totalChunks, string value) { + lock (_syncRoot) { + if (_chunks is null) _chunks = new string[totalChunks]; + else if (_chunks.Length != totalChunks) throw new InvalidOperationException("Different numbers of chunks in same message ID!"); + ref string? chunk = ref _chunks[chunkId - 1]; // Lua 1-indexed + chunk = chunk is null ? value : throw new InvalidOperationException($"Chunk with ID {chunkId} was already received!"); + } + if (++_receivedChunks == totalChunks) FinalizeResult(_chunks); + } + private void FinalizeResult(string?[] _chunks) { + tcs.SetResult(resultParser(string.Concat(_chunks))); + Finished = true; + } + } + + protected int GetFreeId() { + int i = 10; + while (i-- >= 0) { + var id = _rnd.Next(); + if (!_waits.ContainsKey(id)) + return id; + } + throw new InvalidOperationException("Could not get a free ID after many attempts!"); + } + + protected ChunkWaiter GetWaiter(Func resultParser, CancellationToken ct) { + ChunkWaiter waiter; + lock (_syncRoot) { + waiter = new ChunkWaiter(GetFreeId(), resultParser, ct); + _waits.Add(waiter.ID, waiter); + } + return waiter; + } + + protected static Func Deserialize() => msg + => JsonConvert.DeserializeObject(msg) ?? throw new InvalidProgramException("Empty response!"); +} + +public class RefinedStorageComputer : ConnectedComputer { + public const string Role = "rs"; + private const string CmdEnergyUsage = "energyusage"; + private const string CmdEnergyStorage = "energystorage"; + private const string CmdListItems = "listitems"; + private const string CmdItemName = "itemname"; + private const string CmdListFluids = "listfluids"; + + public RefinedStorageComputer(IWebSocketConnection socket) : base(socket) { } + public async Task GetEnergyUsageAsync(CancellationToken ct) { + var waiter = GetWaiter(int.Parse, ct); + await Send(new RequestMessage(waiter.ID, CmdEnergyUsage)); + return await waiter.Task; + } + public async Task GetEnergyStorageAsync(CancellationToken ct) { + var waiter = GetWaiter(int.Parse, ct); + await Send(new RequestMessage(waiter.ID, CmdEnergyStorage)); + return await waiter.Task; + } + public async Task> ListItemsAsync(CancellationToken ct) { + var waiter = GetWaiter(Deserialize>(), ct); + await Send(new RequestMessage(waiter.ID, CmdListItems)); + return await waiter.Task; + } + + public async Task> ListFluidsAsync(CancellationToken ct) { + var waiter = GetWaiter(Deserialize>(), ct); + await Send(new RequestMessage(waiter.ID, CmdListFluids)); + return await waiter.Task; + } + + public async Task HandleCommand(SocketUserMessage message, string[] parameters, CancellationToken ct) { + if (parameters is not { Length: > 0 }) { + await message.ReplyAsync($"Refined Storage system is online"); + return; + } + + try { + switch (parameters[0].ToLower()) { + case CmdEnergyUsage: + await message.ReplyAsync($"Refined Storage system currently uses {await GetEnergyUsageAsync(ct)} RF/t"); + break; + case CmdEnergyStorage: + await message.ReplyAsync($"Refined Storage system stores {await GetEnergyStorageAsync(ct)} RF/t"); + break; + case CmdListItems: + await HandleItemListing(message, ct); + break; + case CmdItemName: + await HandleItemName(message, parameters, ct); + break; + case CmdListFluids: + await HandleFluidListing(message, ct); + break; + case string other: + await message.ReplyAsync($"Refined Storages cannot do '{other}', bruh"); + break; + } + } catch (TaskCanceledException) { + await message.ReplyAsync("The Refined Storage system request timed out!"); + } + } + + private async Task HandleItemName(SocketUserMessage message, string[] parameters, CancellationToken ct) { + if (parameters.Length < 2) await message.ReplyAsync($"Usage: {CmdItemName} filters..."); + else { + var items = await FilterItems(message, parameters[1..], ct); + var sb = new StringBuilder(); + sb.AppendLine("Did you mean:"); + sb.AppendJoin("\n", items.Select(i => i.ToString())); + await message.ReplyAsync(sb.ToString()); + } + } + + private Task> FilterItems(SocketUserMessage message, IEnumerable filters, CancellationToken ct) + => FilterItems(message, filters.Select(ItemFilter.Parse), ct); + + private async Task> FilterItems(SocketUserMessage message, IEnumerable filters, CancellationToken ct) { + var items = Items?.ToList().AsEnumerable(); + if (items is null) items = (await RefreshItemList(ct)).ToList(); + foreach (var filter in filters) + items = items.Where(filter.MatchItem); + return items.ToList(); + } + + public abstract class ItemFilter { + public abstract bool Match(Fluid item); + public virtual bool MatchItem(Item item) => Match(item); + + public static ItemFilter Parse(string filter) + => filter.StartsWith('@') + ? new ModNameFilter(filter[1..]) + : filter.StartsWith('$') + ? new TagFilter(filter[1..]) + : new ItemNameFilter(filter); + + private class ModNameFilter : ItemFilter { + private readonly string filter; + public ModNameFilter(string filter) => this.filter = filter; + public override bool Match(Fluid item) => item.ItemId.ModName.Contains(filter, StringComparison.InvariantCultureIgnoreCase); + } + + private class TagFilter : ItemFilter { + private readonly string filter; + public TagFilter(string filter) => this.filter = filter; + public override bool Match(Fluid item) + => item.Tags?.Any(tag => tag.Contains(filter, StringComparison.InvariantCultureIgnoreCase)) ?? false; + } + + private class ItemNameFilter : ItemFilter { + private readonly string filter; + public ItemNameFilter(string filter) => this.filter = filter; + public override bool Match(Fluid item) => item.DisplayName.Contains(filter, StringComparison.InvariantCultureIgnoreCase); + } + } + + private async Task HandleFluidListing(SocketUserMessage message, CancellationToken ct) { + var sb = new StringBuilder(); + sb.Append("The Refined Storage system stores those fluids:"); + var fluids = await ListFluidsAsync(ct); + foreach (var fluid in fluids.OrderByDescending(i => i.Amount)) + if (fluid.Amount > 10000) sb.AppendFormat("\n{0:n2} B of {1}", fluid.Amount / 1000.0f, fluid.DisplayName); + else sb.AppendFormat("\n{0:n0} mB of {1}", fluid.Amount, fluid.DisplayName); + await message.ReplyAsync(sb.ToString()); + } + + private List? Items; + private readonly object _itemLock = new(); + + private async Task HandleItemListing(SocketUserMessage message, CancellationToken ct) { + var sb = new StringBuilder(); + sb.Append("The Refined Storage system currently stores these items:"); + var items = await RefreshItemList(ct); + lock (_itemLock) { + int taken = 0; + foreach (var item in items) { + if (sb.Length > 500) break; + sb.AppendFormat("\n{0:n0}x {1}", item.Amount, item.DisplayName); + taken++; + } + if (items.Count > taken) sb.AppendFormat("\nand {0} more items.", items.Skip(taken).Sum(i => i.Amount)); + } + await message.ReplyAsync(sb.ToString()); + } + + private async Task> RefreshItemList(CancellationToken ct) { + var response = await ListItemsAsync(ct); + lock (_itemLock) { + Items = response.OrderByDescending(i => i.Amount).ToList(); + return Items; + } + } +} + +[JsonObject(MemberSerialization.OptIn, Description = "Describes an item in a Refined Storage system.", MissingMemberHandling = MissingMemberHandling.Ignore)] +[DebuggerDisplay($"{{{nameof(ToString)}(),nq}}")] +public class Item : Fluid { + [JsonProperty("fingerprint", Required = Required.Always)] + public Md5Hash Fingerprint { get; set; } = default!; + [JsonProperty("nbt", Required = Required.DisallowNull)] + public dynamic? NBT { get; set; } + public override string ToString() => $"{Amount:n0}x {DisplayName}"; +} + +[JsonObject(MemberSerialization.OptIn, Description = "Describes a fluid in a Refined Storage system.", MissingMemberHandling = MissingMemberHandling.Ignore)] +[DebuggerDisplay($"{{{nameof(ToString)}(),nq}}")] +public class Fluid { + [JsonProperty("amount", Required = Required.Always)] + public int Amount { get; set; } + [JsonProperty("displayName", Required = Required.Always)] + public string DisplayName { get; set; } = default!; + [JsonProperty("tags", Required = Required.DisallowNull)] + public string[]? Tags { get; set; } = default; + [JsonProperty("name", Required = Required.Always)] + public ModItemId ItemId { get; set; } = default!; + public override string ToString() => Amount > 10000 + ? $"{Amount / 1000.0f:n2} B of {DisplayName}" + : $"{Amount:n0} mB of {DisplayName}"; +} + +[JsonConverter(typeof(ModItemIdJsonConverter))] +[DebuggerDisplay($"{{{nameof(ToString)}(),nq}}")] +public class ModItemId { + public ModItemId(string name) { + var colon = name.IndexOf(':'); + if (colon < 0) throw new ArgumentException("Invalid mod item id!", nameof(name)); + ModName = name[..colon]; + ModItem = name[(colon + 1)..]; +#if DEBUG + if (ToString() != name) throw new InvalidProgramException("Bad Parsing!"); +#endif + } + public override string ToString() => $"{ModName}:{ModItem}"; + public string ModName { get; } + public string ModItem { get; } + + public class ModItemIdJsonConverter : JsonConverter { + public override ModItemId? ReadJson(JsonReader reader, Type objectType, ModItemId? existingValue, bool hasExistingValue, JsonSerializer serializer) + => reader.Value is string value + ? new(value) + : throw new JsonException($"Could not parse mod name with token '{reader.Value}'"); + public override void WriteJson(JsonWriter writer, ModItemId? value, JsonSerializer serializer) { + if (value is null) writer.WriteNull(); + else writer.WriteValue(value.ToString()); + } + } +} + +[JsonConverter(typeof(Md5JsonConverter))] +[DebuggerDisplay($"{{{nameof(ToString)}(),nq}}")] +public class Md5Hash : IEquatable { + private readonly byte[] _hash; + public Md5Hash(string hash) : this(Convert.FromHexString(hash)) { } + public Md5Hash(byte[] hash) { + if (hash is not { Length: 16 }) throw new ArgumentException("Invalid digest size!", nameof(hash)); + _hash = hash; + } + public override bool Equals(object? obj) => Equals(obj as Md5Hash); + public bool Equals(Md5Hash? other) => other != null && _hash.SequenceEqual(other._hash); + public override int GetHashCode() { + var hashCode = new HashCode(); + hashCode.AddBytes(_hash); + return hashCode.ToHashCode(); + } + public override string ToString() => Convert.ToHexString(_hash); + + public class Md5JsonConverter : JsonConverter { + public override Md5Hash? ReadJson(JsonReader reader, Type objectType, Md5Hash? existingValue, bool hasExistingValue, JsonSerializer serializer) + => reader.Value is string { Length: 32 } value + ? new(value) + : throw new JsonException($"Could not parse MD5 hash with token '{reader.Value}'"); + public override void WriteJson(JsonWriter writer, Md5Hash? value, JsonSerializer serializer) { + if (value is null) writer.WriteNull(); + else writer.WriteValue(value.ToString()); + } + } +} \ No newline at end of file diff --git a/MinecraftDiscordBot/Message.cs b/MinecraftDiscordBot/Message.cs index a55929e..4a6dcab 100644 --- a/MinecraftDiscordBot/Message.cs +++ b/MinecraftDiscordBot/Message.cs @@ -7,6 +7,13 @@ public abstract class Message { [JsonProperty("type")] public abstract string Type { get; } } + +public class CapabilityMessage : Message { + public override string Type => "roles"; + [JsonProperty("role", Required = Required.Always)] + public string Role { get; set; } = default!; +} + public class TextMessage : Message { public TextMessage(SocketMessage arg) : this(arg.Author.Username, arg.Content) { } public TextMessage(string author, string content) { @@ -14,8 +21,40 @@ public class TextMessage : Message { Content = content; } public override string Type => "text"; - [JsonProperty("author")] - public string Author { get; } - [JsonProperty("message")] - public string Content { get; } + [JsonProperty("author", Required = Required.Always)] + public string Author { get; set; } + [JsonProperty("message", Required = Required.Always)] + public string Content { get; set; } +} + +public class ReplyMessage : Message { + public ReplyMessage(int answerId, string result) { + AnswerId = answerId; + Result = result; + } + [JsonProperty("id", Required = Required.Always)] + public int AnswerId { get; set; } + [JsonProperty("result", Required = Required.Always)] + public string Result { get; set; } + [JsonProperty("chunk", Required = Required.Always)] + public int Chunk { get; set; } + [JsonProperty("total", Required = Required.Always)] + public int Total { get; set; } + public override string Type => "reply"; +} + +public class RequestMessage : Message { + public RequestMessage(int answerId, string method, Dictionary? parameters = null) { + AnswerId = answerId; + Method = method; + Parameters = (parameters ?? Enumerable.Empty>()) + .ToDictionary(i => i.Key, i => i.Value); + } + [JsonProperty("id")] + public int AnswerId { get; set; } + [JsonProperty("method")] + public string Method { get; set; } + [JsonProperty("params")] + public Dictionary Parameters { get; } + public override string Type => "request"; } \ No newline at end of file diff --git a/MinecraftDiscordBot/Program.cs b/MinecraftDiscordBot/Program.cs index 7071d8c..a4ccd29 100644 --- a/MinecraftDiscordBot/Program.cs +++ b/MinecraftDiscordBot/Program.cs @@ -1,9 +1,11 @@ using Discord; +using Discord.Commands; using Discord.Rest; using Discord.WebSocket; using Fleck; using Newtonsoft.Json; using System.Collections.Concurrent; +using System.Reflection; namespace MinecraftDiscordBot; @@ -16,12 +18,14 @@ public class Program { private readonly WebSocketServer _wssv; private readonly BotConfiguration _config; private readonly HashSet _whitelistedChannels; - private readonly ConcurrentDictionary _connections = new(); + private readonly ConcurrentDictionary _connections = new(); + private static readonly char[] WhiteSpace = new char[] { '\t', '\n', ' ', '\r' }; + private RefinedStorageComputer? _rsSystem = null; public Program(BotConfiguration config) { _config = config; _client.Log += Log; - _client.MessageReceived += MessageReceived; + _client.MessageReceived += (msg) => DiscordMessageReceived(msg); _wssv = new WebSocketServer($"ws://0.0.0.0:{config.Port}") { RestartAfterListenError = true }; @@ -41,7 +45,9 @@ public class Program { }); await _client.LoginAsync(TokenType.Bot, _config.Token); await _client.StartAsync(); +#if !DEBUG await VerifyTextChannels(); +#endif // Block this task until the program is closed. await Task.Delay(-1); @@ -57,33 +63,79 @@ public class Program { } } - private async Task SocketReceived(IWebSocketConnection socket, string message) - => await Log(new LogMessage(LogSeverity.Info, WebSocketSource, $"[{socket.ConnectionInfo.Id}] Received: {message}")).ConfigureAwait(false); + private async Task SocketReceived(IWebSocketConnection socket, string message) { + var capability = JsonConvert.DeserializeObject(message); - private async Task SocketClosed(IWebSocketConnection socket) { + if (capability is null) return; + var pc = capability.Role switch { + RefinedStorageComputer.Role => new RefinedStorageComputer(socket), + string role => throw new ArgumentException($"Invalid role '{role}'!") + }; + AddComputerSocket(socket, pc); + await Log(new LogMessage(LogSeverity.Info, WebSocketSource, $"[{socket.ConnectionInfo.Id}] Presented capability as {pc.GetType().Name}")).ConfigureAwait(false); + } + + private void AddComputerSocket(IWebSocketConnection socket, RefinedStorageComputer pc) { + _connections[socket.ConnectionInfo.Id] = pc; + if (pc is not null) _rsSystem = pc; + } + + private void RemoveComputerSocket(IWebSocketConnection socket) { if (!_connections.TryRemove(socket.ConnectionInfo.Id, out _)) throw new InvalidProgramException("Could not remove non-existing client!"); + if (_rsSystem?.ConnectionInfo.Id == socket.ConnectionInfo.Id) _rsSystem = null; + } + + private async Task SocketClosed(IWebSocketConnection socket) { + RemoveComputerSocket(socket); await Log(new LogMessage(LogSeverity.Info, WebSocketSource, $"[{socket.ConnectionInfo.Id}] Client disconnected!")).ConfigureAwait(false); } - private async Task SocketOpened(IWebSocketConnection socket) { - if (!_connections.TryAdd(socket.ConnectionInfo.Id, socket)) - throw new InvalidProgramException("Could not add already-existing client!"); - await Log(new LogMessage(LogSeverity.Info, WebSocketSource, $"[{socket.ConnectionInfo.Id}] Client connected from {socket.ConnectionInfo.ClientIpAddress}:{socket.ConnectionInfo.ClientPort}!")).ConfigureAwait(false); - } + private async Task SocketOpened(IWebSocketConnection socket) => await Log(new LogMessage(LogSeverity.Info, WebSocketSource, $"[{socket.ConnectionInfo.Id}] Client connected from {socket.ConnectionInfo.ClientIpAddress}:{socket.ConnectionInfo.ClientPort}!")).ConfigureAwait(false); - private async Task MessageReceived(SocketMessage arg) { - if (arg.Author.IsBot) return; - if (IsChannelWhitelisted(arg.Channel)) - await Log(new LogMessage(LogSeverity.Info, "Discord", $"[{arg.Author.Username}] {arg.Content}")).ConfigureAwait(false); + private async Task DiscordMessageReceived(SocketMessage arg, int timeout = 10000) { + if (arg is not SocketUserMessage message) return; + if (message.Author.IsBot) return; + if (!IsChannelWhitelisted(arg.Channel)) return; + + var cts = new CancellationTokenSource(timeout +#if DEBUG + * 1000 +#endif + ); + + if (IsCommand(message, out var argPos)) { + var parameters = message.Content[argPos..].Split(WhiteSpace, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + _ = Task.Run(() => HandleCommand(message, parameters, cts.Token)); + return; + } + + await Log(new LogMessage(LogSeverity.Info, "Discord", $"[{arg.Author.Username}] {arg.Content}")).ConfigureAwait(false); await SendToAll(JsonConvert.SerializeObject(new TextMessage(arg))); } + private Task HandleCommand(SocketUserMessage message, string[] parameters, CancellationToken ct) + => parameters is { Length: > 0 } + ? parameters[0].ToLower() switch { + RefinedStorageComputer.Role => HandleRefinedStorageCommand(message, parameters[1..], ct), + _ => message.ReplyAsync($"What the fuck do you mean by '{parameters[0]}'?") + } + : message.ReplyAsync($"You really think an empty command works?"); + + private Task HandleRefinedStorageCommand(SocketUserMessage message, string[] parameters, CancellationToken ct) + => _rsSystem is null + ? message.ReplyAsync("The Refined Storage system is currently unavailable!") + : _rsSystem.HandleCommand(message, parameters, ct); + + private bool IsCommand(SocketUserMessage message, out int argPos) { + argPos = 0; + return message.HasStringPrefix(_config.Prefix, ref argPos); + } private bool IsChannelWhitelisted(ISocketMessageChannel channel) => _whitelistedChannels.Contains(channel.Id); private async Task SendToAll(string message) { - async Task SendToClient(KeyValuePair cp) { + async Task SendToClient(KeyValuePair cp) { try { await cp.Value.Send(message); } catch (Exception e) {