diff --git a/Directory.Packages.props b/Directory.Packages.props index f307df8e4..5865fc577 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -10,6 +10,7 @@ + diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs index 75c0985a8..3ae15d104 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs @@ -8,6 +8,13 @@ public class AgentRule [JsonPropertyName("disabled")] public bool Disabled { get; set; } - [JsonPropertyName("criteria")] - public string Criteria { get; set; } = string.Empty; + [JsonPropertyName("config")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public RuleConfig? Config { get; set; } } + +public class RuleConfig +{ + [JsonPropertyName("topology_name")] + public string? TopologyName { get; set; } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs index e2ec38a5a..8442eae45 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs @@ -4,4 +4,5 @@ public class CodeExecutionContext { public AgentCodeScript CodeScript { get; set; } public List Arguments { get; set; } = []; + public string? InvokeFrom { get; set; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs new file mode 100644 index 000000000..4df43dd0e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs @@ -0,0 +1,21 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract interface for message queue consumers. +/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus). +/// +public interface IMQConsumer : IDisposable +{ + /// + /// Gets the consumer config + /// + object Config { get; } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + Task HandleMessageAsync(string channel, string data); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs new file mode 100644 index 000000000..672e539c1 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs @@ -0,0 +1,31 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public interface IMQService : IDisposable +{ + /// + /// Subscribe a consumer to the message queue. + /// The consumer will be initialized with the appropriate MQ-specific infrastructure. + /// + /// Unique identifier for the consumer + /// The consumer implementing IMQConsumer interface + /// Task representing the async subscription operation + Task SubscribeAsync(string key, IMQConsumer consumer); + + /// + /// Unsubscribe a consumer from the message queue. + /// + /// Unique identifier for the consumer + /// Task representing the async unsubscription operation + Task UnsubscribeAsync(string key); + + /// + /// Publish payload to message queue + /// + /// + /// + /// + /// + Task PublishAsync(T payload, MQPublishOptions options); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs new file mode 100644 index 000000000..cd66be1fd --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs @@ -0,0 +1,51 @@ +using Microsoft.Extensions.Logging; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract base class for RabbitMQ consumers. +/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ. +/// The RabbitMQ-specific infrastructure is handled by RabbitMQService. +/// +public abstract class MQConsumerBase : IMQConsumer +{ + protected readonly IServiceProvider _services; + protected readonly ILogger _logger; + private bool _disposed = false; + + /// + /// Gets the consumer config for this consumer. + /// Override this property to customize exchange, queue and routing configuration. + /// + public abstract object Config { get; } + + protected MQConsumerBase( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + public abstract Task HandleMessageAsync(string channel, string data); + + public void Dispose() + { + if (_disposed) + { + return; + } + + var consumerName = GetType().Name; + _logger.LogWarning($"Disposing consumer: {consumerName}"); + _disposed = true; + GC.SuppressFinalize(this); + } +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs new file mode 100644 index 000000000..b08a5a054 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public class MessageQueueSettings +{ + public bool Enabled { get; set; } + public string Provider { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs new file mode 100644 index 000000000..e940aff01 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs @@ -0,0 +1,14 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +public class MQMessage +{ + public MQMessage(T payload, string messageId) + { + Payload = payload; + MessageId = messageId; + } + + public T Payload { get; set; } + public string MessageId { get; set; } + public DateTime CreateDate { get; set; } = DateTime.UtcNow; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs new file mode 100644 index 000000000..dead523be --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs @@ -0,0 +1,40 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +/// +/// Configuration options for publishing messages to a message queue. +/// These options are MQ-product agnostic and can be adapted by different implementations. +/// +public class MQPublishOptions +{ + /// + /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus). + /// + public string TopicName { get; set; } = string.Empty; + + /// + /// The routing key (partition key in some MQ systems, used for message routing). + /// + public string RoutingKey { get; set; } = string.Empty; + + /// + /// Delay in milliseconds before the message is delivered. + /// + public long DelayMilliseconds { get; set; } + + /// + /// Optional unique identifier for the message. + /// + public string? MessageId { get; set; } + + /// + /// Additional arguments for the publish configuration (MQ-specific). + /// + public Dictionary Arguments { get; set; } = []; + + /// + /// Json serializer options + /// + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs index 54fc80a90..a39e4bf05 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs @@ -12,5 +12,5 @@ public interface IInstructHook : IHookBase Task OnResponseGenerated(InstructResponseModel response) => Task.CompletedTask; Task BeforeCodeExecution(Agent agent, CodeExecutionContext context) => Task.CompletedTask; - Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) => Task.CompletedTask; + Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) => Task.CompletedTask; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs index f5758434c..81023135f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs @@ -28,7 +28,7 @@ public virtual async Task BeforeCodeExecution(Agent agent, CodeExecutionContext await Task.CompletedTask; } - public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) + public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) { await Task.CompletedTask; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs new file mode 100644 index 000000000..49f2029a7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs @@ -0,0 +1,13 @@ +using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules.Hooks; + +public interface IRuleTriggerHook : IHookBase +{ + Task BeforeRuleConditionExecuting(Agent agent, RuleNode conditionNode, IRuleTrigger trigger, RuleFlowContext context) => Task.CompletedTask; + Task AfterRuleConditionExecuted(Agent agent, RuleNode conditionNode, IRuleTrigger trigger, RuleFlowContext context, RuleNodeResult result) => Task.CompletedTask; + + Task BeforeRuleActionExecuting(Agent agent, RuleNode actionNode, IRuleTrigger trigger, RuleFlowContext context) => Task.CompletedTask; + Task AfterRuleActionExecuted(Agent agent, RuleNode actionNode, IRuleTrigger trigger, RuleFlowContext context, RuleNodeResult result) => Task.CompletedTask; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs index bebac5d6f..041675a2c 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs @@ -1,5 +1,21 @@ +using BotSharp.Abstraction.Rules.Models; + namespace BotSharp.Abstraction.Rules; -public interface IRuleAction +/// +/// Base interface for rule actions that can be executed by the RuleEngine +/// +public interface IRuleAction : IRuleFlowUnit { -} + /// + /// Execute the rule action + /// + /// The agent that triggered the rule + /// The rule trigger + /// The flow context + /// The action execution result + Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context); +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs new file mode 100644 index 000000000..745ad3349 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs @@ -0,0 +1,22 @@ +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules; + +/// +/// Base interface for rule conditions that can be evaluated by the RuleEngine +/// +public interface IRuleCondition : IRuleFlowUnit +{ + /// + /// Evaluate the rule condition + /// + /// The agent that triggered the rule + /// The rule trigger + /// The flow context + /// The condition evaluation result + Task EvaluateAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context); +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs deleted file mode 100644 index dcbe18271..000000000 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs +++ /dev/null @@ -1,5 +0,0 @@ -namespace BotSharp.Abstraction.Rules; - -public interface IRuleConfig -{ -} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs deleted file mode 100644 index bc5022911..000000000 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs +++ /dev/null @@ -1,5 +0,0 @@ -namespace BotSharp.Abstraction.Rules; - -public interface IRuleCriteria -{ -} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs index c7a6d847b..d9958e549 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs @@ -1,3 +1,5 @@ +using BotSharp.Abstraction.Rules.Models; + namespace BotSharp.Abstraction.Rules; public interface IRuleEngine @@ -13,4 +15,15 @@ public interface IRuleEngine /// Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null) => throw new NotImplementedException(); + + /// + /// Execute rule graph node + /// + /// + /// + /// + /// + /// + /// + Task ExecuteGraphNode(RuleNode node, RuleGraph graph, string agentId, IRuleTrigger trigger, RuleNodeExecutionOptions options); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs new file mode 100644 index 000000000..71dca2ac9 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs @@ -0,0 +1,26 @@ +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules; + +public interface IRuleFlow where T : class +{ + /// + /// Rule flow topology name + /// + string Name { get; } + + /// + /// Get rule flow topology config + /// + /// + /// + Task GetTopologyConfigAsync(RuleFlowConfigOptions? options = null); + + /// + /// Get rule flow topology + /// + /// + /// + /// + Task GetTopologyAsync(string id, RuleFlowLoadOptions? options = null); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs new file mode 100644 index 000000000..795b2ce01 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs @@ -0,0 +1,21 @@ +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules; + +public interface IRuleFlowUnit +{ + /// + /// The unique name of the rule flow unit, i.e., action, condition. + /// + string Name => string.Empty; + + /// + /// The agent id + /// + string? AgentId => null; + + /// + /// The trigger names + /// + IEnumerable? Triggers => null; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs new file mode 100644 index 000000000..69b5915c0 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs @@ -0,0 +1,10 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleConfigModel +{ + public string TopologyId { get; set; } + public string TopologyName { get; set; } + public JsonDocument CustomParameters { get; set; } = JsonDocument.Parse("{}"); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs new file mode 100644 index 000000000..4e9040efe --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs @@ -0,0 +1,18 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +/// +/// Context for rule flow execution (actions and conditions) +/// +public class RuleFlowContext +{ + public string Text { get; set; } = string.Empty; + public Dictionary Parameters { get; set; } = []; + public IEnumerable PrevStepResults { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } + public RuleNode Node { get; set; } + public RuleEdge Edge { get; set; } + public RuleGraph Graph { get; set; } +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs new file mode 100644 index 000000000..b4b6d5953 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs @@ -0,0 +1,22 @@ +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleFlowStepResult : RuleNodeResult +{ + public RuleNode Node { get; set; } + + /// + /// Create a RuleFlowStepResult from a RuleNodeResult and a RuleNode + /// + public static RuleFlowStepResult FromResult(RuleNodeResult result, RuleNode node) + { + return new RuleFlowStepResult + { + Node = node, + Success = result.Success, + Response = result.Response, + ErrorMessage = result.ErrorMessage, + Data = new(result.Data ?? []), + IsDelayed = result.IsDelayed + }; + } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs new file mode 100644 index 000000000..113126f0d --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs @@ -0,0 +1,29 @@ +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleNodeResult +{ + /// + /// Whether the node is executed successfully + /// + public virtual bool Success { get; set; } + + /// + /// Response content from the node + /// + public virtual string? Response { get; set; } + + /// + /// Error message if the node execution failed + /// + public virtual string? ErrorMessage { get; set; } + + /// + /// Result data (used for actions) + /// + public virtual Dictionary Data { get; set; } = []; + + /// + /// Whether the node execution is delayed (used for actions) + /// + public virtual bool IsDelayed { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs new file mode 100644 index 000000000..ddb3e734f --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleFlowConfigOptions +{ + public string? TopologyName { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs new file mode 100644 index 000000000..38c653bcf --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleFlowLoadOptions +{ + public string? Query { get; set; } + public Dictionary? Parameters { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs new file mode 100644 index 000000000..6d8709107 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs @@ -0,0 +1,34 @@ +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleFlowOptions +{ + /// + /// Flow topology provider + /// + [JsonPropertyName("topology_provider")] + public string? TopologyProvider { get; set; } + + /// + /// Flow topology name + /// + [JsonPropertyName("topology_name")] + public string? TopologyName { get; set; } + + /// + /// Query to get flow topology + /// + [JsonPropertyName("query")] + public string? Query { get; set; } + + /// + /// Graph traversal algorithm: "dfs" (default) or "bfs" + /// + [JsonPropertyName("traversal_algorithm")] + public string TraversalAlgorithm { get; set; } = "dfs"; + + /// + /// Additional custom parameters, e.g., root_node_name, max_recursion + /// + [JsonPropertyName("parameters")] + public Dictionary Parameters { get; set; } = []; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs new file mode 100644 index 000000000..2bc5b5c52 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs @@ -0,0 +1,11 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleNodeExecutionOptions +{ + public string Text { get; set; } + public IEnumerable States { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } + public RuleFlowOptions? Flow { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs index 068052b0b..46e35fe86 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs @@ -1,3 +1,4 @@ +using BotSharp.Abstraction.Repositories.Filters; using System.Text.Json; namespace BotSharp.Abstraction.Rules.Options; @@ -5,22 +6,17 @@ namespace BotSharp.Abstraction.Rules.Options; public class RuleTriggerOptions { /// - /// Code processor provider + /// Filter agents /// - public string? CodeProcessor { get; set; } + public AgentFilter? AgentFilter { get; set; } /// - /// Code script name + /// Json serializer options /// - public string? CodeScriptName { get; set; } + public JsonSerializerOptions? JsonOptions { get; set; } /// - /// Argument name as an input key to the code script + /// Rule flow options /// - public string? ArgumentName { get; set; } - - /// - /// Json arguments as an input value to the code script - /// - public JsonDocument? ArgumentContent { get; set; } + public RuleFlowOptions? Flow { get; set; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs new file mode 100644 index 000000000..632a3df2d --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs @@ -0,0 +1,229 @@ +namespace BotSharp.Abstraction.Rules; + +public class RuleGraph +{ + private string _id = Guid.NewGuid().ToString(); + private List _nodes = []; + private List _edges = []; + + public RuleGraph() + { + _id = Guid.NewGuid().ToString(); + _nodes = []; + _edges = []; + } + + public static RuleGraph Init() + { + return new RuleGraph(); + } + + public RuleNode? GetRootNode(string? name = null) + { + if (!string.IsNullOrEmpty(name)) + { + return _nodes.FirstOrDefault(x => x.Name.IsEqualTo(name)); + } + + return _nodes.FirstOrDefault(x => x.Type.IsEqualTo("root") || x.Type.IsEqualTo("start")); + } + + public (RuleNode? Node, IEnumerable IncomingEdges, IEnumerable OutgoingEdges) GetNode(string id) + { + var node = _nodes.FirstOrDefault(x => x.Id.IsEqualTo(id)); + if (node == null) + { + return (null, [], []); + } + + var incomingEdges = _edges + .Where(e => e.To != null && e.To.Id.IsEqualTo(id)) + .OrderByDescending(x => x.Weight) + .ToList(); + var outgoingEdges = _edges + .Where(e => e.From != null && e.From.Id.IsEqualTo(id)) + .OrderByDescending(x => x.Weight) + .ToList(); + + return (node, incomingEdges, outgoingEdges); + } + + public string GetGraphId() + { + return _id; + } + + public IEnumerable GetNodes(Func? filter = null) + { + return filter == null ? [.. _nodes] : [.. _nodes.Where(filter)]; + } + + public IEnumerable GetEdges(Func? filter = null) + { + return filter == null ? [.. _edges] : [.. _edges.Where(filter)]; + } + + public void SetGraphId(string id) + { + _id = id; + } + + public void SetNodes(IEnumerable nodes) + { + _nodes = [.. nodes?.ToList() ?? []]; + } + + public void SetEdges(IEnumerable edges) + { + _edges = [.. edges?.ToList() ?? []]; + } + + public void AddNode(RuleNode node) + { + var found = _nodes.Exists(x => x.Id.IsEqualTo(node.Id)); + if (!found) + { + _nodes.Add(node); + } + } + + public void AddEdge(RuleNode from, RuleNode to, GraphItemPayload payload) + { + var sourceFound = _nodes.Exists(x => x.Id.IsEqualTo(from.Id)); + var targetFound = _nodes.Exists(x => x.Id.IsEqualTo(to.Id)); + var edgeFound = _edges.Exists(x => x.Id.IsEqualTo(payload.Id)); + + if (!sourceFound) + { + _nodes.Add(from); + } + + if (!targetFound) + { + _nodes.Add(to); + } + + if (!edgeFound) + { + _edges.Add(new RuleEdge(from, to) + { + Id = payload.Id, + Name = payload.Name, + Type = payload.Type, + Labels = [.. payload.Labels ?? []], + Weight = payload.Weight, + Purpose = payload.Purpose, + Config = new(payload.Config ?? []) + }); + } + } + + public IEnumerable<(RuleNode, RuleEdge)> GetParentNodes(RuleNode node) + { + return _edges.Where(e => e.To != null && e.To.Id.IsEqualTo(node.Id)) + .OrderByDescending(e => e.Weight) + .Select(e => (e.From, e)) + .ToList(); + } + + public IEnumerable<(RuleNode, RuleEdge)> GetChildrenNodes(RuleNode node) + { + return _edges.Where(e => e.From != null && e.From.Id.IsEqualTo(node.Id)) + .OrderByDescending(e => e.Weight) + .Select(e => (e.To, e)) + .ToList(); + } + + public RuleGraphInfo GetGraphInfo() + { + return new() + { + GraphId = _id, + Nodes = [.. _nodes?.ToList() ?? []], + Edges = [.. _edges?.ToList() ?? []] + }; + } + + public void Clear() + { + _nodes = []; + _edges = []; + } + + public static RuleGraph FromGraphInfo(RuleGraphInfo graphInfo) + { + var graph = new RuleGraph(); + graph.SetGraphId(graphInfo.GraphId.IfNullOrEmptyAs(Guid.NewGuid().ToString())!); + graph.SetNodes(graphInfo.Nodes); + graph.SetEdges(graphInfo.Edges); + return graph; + } + + public override string ToString() + { + return $"Graph ({_id}) => Nodes: {_nodes.Count}, Edges: {_edges.Count}"; + } +} + +public class RuleNode : GraphItem +{ + /// + /// Node type: root, criteria, action, etc. + /// + public override string Type { get; set; } = "action"; + + public override string ToString() + { + return $"Node ({Id}): {Name} ({Type} => {Purpose})"; + } +} + +public class RuleEdge : GraphItem +{ + /// + /// Edge type: is_next, etc. + /// + public override string Type { get; set; } = "next"; + + public RuleNode From { get; set; } + public RuleNode To { get; set; } + + public RuleEdge() : base() + { + + } + + public RuleEdge(RuleNode from, RuleNode to) : base() + { + Id = Guid.NewGuid().ToString(); + From = from; + To = to; + } + + public override string ToString() + { + return $"Edge ({Id}): {Name} ({Type} => {Purpose}), Connects from Node ({From?.Name}) to Node ({To?.Name})"; + } +} + +public class GraphItem +{ + public virtual string Id { get; set; } = Guid.NewGuid().ToString(); + public virtual string Name { get; set; } = null!; + public virtual string Type { get; set; } = null!; + public virtual IEnumerable Labels { get; set; } = []; + public virtual double Weight { get; set; } = 1.0; + public virtual string? Purpose { get; set; } + public virtual Dictionary Config { get; set; } = []; +} + +public class GraphItemPayload : GraphItem +{ +} + +public class RuleGraphInfo +{ + public string GraphId { get; set; } + public IEnumerable Nodes { get; set; } = []; + public IEnumerable Edges { get; set; } = []; +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs index a36516c8a..c0f858fd5 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs @@ -65,4 +65,70 @@ public static class ObjectExtensions return null; } } + + public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) + { + return dict.TryGetValue(key, out var value, jsonOptions) + ? value! + : defaultValue; + } + + public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) + { + result = default; + + if (!dict.TryGetValue(key, out var value) || value is null) + { + return false; + } + + if (value is T t) + { + result = t; + return true; + } + + if (value is JsonElement je) + { + try + { + result = je.Deserialize(jsonOptions); + return true; + } + catch + { + return false; + } + } + + return false; + } + + + public static T? TryGetObjectValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) where T : class + { + return dict.TryGetObjectValue(key, out var value, jsonOptions) + ? value! + : defaultValue; + } + + public static bool TryGetObjectValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) where T : class + { + result = default; + + if (!dict.TryGetValue(key, out var value) || value is null) + { + return false; + } + + try + { + result = JsonSerializer.Deserialize(value, jsonOptions); + return true; + } + catch + { + return false; + } + } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs new file mode 100644 index 000000000..e525ad9a0 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs @@ -0,0 +1,82 @@ +namespace BotSharp.Core.Rules.Actions; + +public sealed class ChatRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public ChatRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "send_message_to_agent"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + using var scope = _services.CreateScope(); + var sp = scope.ServiceProvider; + + try + { + var channel = trigger.Channel; + var convService = sp.GetRequiredService(); + var conv = await convService.NewConversation(new Conversation + { + Channel = channel, + Title = context.Text, + AgentId = agent.Id + }); + + var message = new RoleDialogModel(AgentRole.User, context.Text); + + var allStates = new List + { + new("channel", channel) + }; + + if (!context.Parameters.IsNullOrEmpty()) + { + var states = context.Parameters.Where(x => x.Value != null).Select(x => new MessageState(x.Key, x.Value!)); + allStates.AddRange(states); + } + + await convService.SetConversationId(conv.Id, allStates); + await convService.SendMessage(agent.Id, + message, + null, + msg => Task.CompletedTask); + + var data = new Dictionary(convService.States.GetStates() ?? []); + await convService.SaveStates(); + + _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id); + + + data["agent_id"] = agent.Id; + data["conversation_id"] = conv.Id; + + return new RuleNodeResult + { + Success = true, + Response = conv.Id, + Data = data + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name); + return new RuleNodeResult + { + Success = false, + ErrorMessage = ex.Message + }; + } + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs new file mode 100644 index 000000000..e9b040e76 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs @@ -0,0 +1,54 @@ +using BotSharp.Abstraction.Functions; + +namespace BotSharp.Core.Rules.Actions; + +public sealed class FunctionCallRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public FunctionCallRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "function_call"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + var funcName = context.Parameters.TryGetValue("function_name", out var fName) ? fName : null; + var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName)); + + if (func == null) + { + var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return new RuleNodeResult + { + Success = false, + ErrorMessage = errorMsg + }; + } + + var funcArg = context.Parameters.TryGetObjectValueOrDefault("function_argument", new()) ?? new(); + await func.Execute(funcArg); + + return new RuleNodeResult + { + Success = true, + Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content, + Data = new() + { + ["function_name"] = funcName!, + ["function_argument"] = funcArg?.ConvertToString() ?? "{}", + ["function_call_result"] = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content ?? string.Empty + } + }; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs new file mode 100644 index 000000000..402f098a9 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs @@ -0,0 +1,204 @@ +using System.Net.Mime; +using System.Text.Json; +using System.Web; + +namespace BotSharp.Core.Rules.Actions; + +public sealed class HttpRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IHttpClientFactory _httpClientFactory; + + public HttpRuleAction( + IServiceProvider services, + ILogger logger, + IHttpClientFactory httpClientFactory) + { + _services = services; + _logger = logger; + _httpClientFactory = httpClientFactory; + } + + public string Name => "http_request"; + + // Default configuration example: + // { + // "http_url": "https://dummy.example.com/api/v1/employees", + // "http_method": "GET" + // } + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + try + { + var httpMethod = GetHttpMethod(context); + if (httpMethod == null) + { + var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return new RuleNodeResult + { + Success = false, + ErrorMessage = errorMsg + }; + } + + // Build the full URL + var fullUrl = BuildUrl(context); + + using var client = _httpClientFactory.CreateClient(); + + // Add headers + AddHttpHeaders(client, context); + + // Create request + var request = new HttpRequestMessage(httpMethod, fullUrl); + + // Add request body if provided + var requestBodyStr = GetHttpRequestBody(context); + if (!string.IsNullOrEmpty(requestBodyStr)) + { + request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json); + } + + _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}", + agent.Id, fullUrl, httpMethod); + + // Send request + var response = await client.SendAsync(request); + var responseContent = await response.Content.ReadAsStringAsync(); + + if (response.IsSuccessStatusCode) + { + _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}, Response: {Response}", + agent.Id, response.StatusCode, responseContent); + + return new RuleNodeResult + { + Success = true, + Response = responseContent, + Data = new() + { + ["http_response_headers"] = JsonSerializer.Serialize(response.Headers), + ["http_response"] = responseContent + } + }; + } + else + { + var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}"; + _logger.LogWarning(errorMsg); + return new RuleNodeResult + { + Success = false, + ErrorMessage = errorMsg + }; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}", + agent.Id, trigger.Name); + return new RuleNodeResult + { + Success = false, + ErrorMessage = ex.Message + }; + } + } + + private string BuildUrl(RuleFlowContext context) + { + var url = context.Parameters.GetValueOrDefault("http_url", string.Empty); + if (string.IsNullOrEmpty(url)) + { + throw new ArgumentNullException("Unable to find http_url in context"); + } + + // Fill in placeholders in url + foreach (var param in context.Parameters) + { + var value = param.Value?.ToString(); + if (string.IsNullOrEmpty(value)) + { + continue; + } + url = url.Replace($"{{{param.Key}}}", value); + } + + // Add query parameters + var queryParams = context.Parameters.TryGetObjectValueOrDefault>("http_query_params"); + if (!queryParams.IsNullOrEmpty()) + { + var builder = new UriBuilder(url); + var query = HttpUtility.ParseQueryString(builder.Query); + + // Add new query params + foreach (var kv in queryParams!.Where(x => x.Value != null)) + { + query[kv.Key] = kv.Value!; + } + + // Assign merged query back + builder.Query = query.ToString(); + url = builder.ToString(); + } + + _logger.LogInformation("HTTP url after filling: {Url}", url); + return url; + } + + private HttpMethod? GetHttpMethod(RuleFlowContext context) + { + var method = context.Parameters.GetValueOrDefault("http_method", string.Empty); + var innerMethod = method?.Trim()?.ToUpper(); + HttpMethod? matchMethod = null; + + switch (innerMethod) + { + case "GET": + matchMethod = HttpMethod.Get; + break; + case "POST": + matchMethod = HttpMethod.Post; + break; + case "DELETE": + matchMethod = HttpMethod.Delete; + break; + case "PUT": + matchMethod = HttpMethod.Put; + break; + case "PATCH": + matchMethod = HttpMethod.Patch; + break; + default: + break; + + } + + return matchMethod; + } + + private void AddHttpHeaders(HttpClient client, RuleFlowContext context) + { + var headerParams = context.Parameters.TryGetObjectValueOrDefault>("http_request_headers"); + if (!headerParams.IsNullOrEmpty()) + { + foreach (var header in headerParams!) + { + client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value); + } + } + } + + private string? GetHttpRequestBody(RuleFlowContext context) + { + var body = context.Parameters.GetValueOrDefault("http_request_body"); + return body; + } +} + diff --git a/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs b/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs new file mode 100644 index 000000000..c9210c690 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs @@ -0,0 +1,32 @@ +namespace BotSharp.Core.Rules.Conditions; + +public class AllVisitedRuleCondition : IRuleCondition +{ + private readonly ILogger _logger; + + public AllVisitedRuleCondition( + ILogger logger) + { + _logger = logger; + } + + public string Name => "all_visited"; + + public async Task EvaluateAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + var currentNode = context.Node; + var parents = context.Graph.GetParentNodes(currentNode); + var parentNodeIds = parents.Select(x => x.Item1.Id).ToList(); + var visitedNodeIds = context.PrevStepResults?.Select(x => x.Node.Id)?.ToHashSet() ?? []; + var allVisited = parentNodeIds.All(x => visitedNodeIds.Contains(x)); + + return new RuleNodeResult + { + Success = allVisited, + Response = allVisited ? "All parent nodes have been visited" : "Missing parenet nodes visiting." + }; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs b/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs new file mode 100644 index 000000000..91f674330 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs @@ -0,0 +1,167 @@ +using System.Text.Json; + +namespace BotSharp.Core.Rules.Conditions; + +/// +/// A general loop condition node that iterates over a list of items in context parameters. +/// +/// Expected parameters: +/// - "list_items": A JSON array of items to iterate over (e.g. ["a","b","c"] or [1,2,3] or [{...},{...}]). +/// - "iterate_index": The current iteration index (auto-managed, starts at 0). +/// - "iterate_current_item": The current item being processed (auto-set each iteration). +/// +/// Flow: +/// Action Node → LoopCondition → (true) → back to Action Node +/// → (false) → resets list_items, iterate_index, iterate_current_item and continues +/// +public sealed class LoopingCondition : IRuleCondition +{ + private const string PARAM_LIST_ITEMS = "list_items"; + private const string PARAM_LIST_ITEMS_KEY = "list_items_key"; + private const string PARAM_ITERATE_INDEX = "iterate_index"; + private const string PARAM_ITERATE_ITEM_KEY = "iterate_item_key"; + private const string PARAM_ITERATE_NEXT_ITEM = "iterate_next_item"; + + private readonly ILogger _logger; + + public LoopingCondition(ILogger logger) + { + _logger = logger; + } + + public string Name => "looping"; + + public async Task EvaluateAsync( + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + try + { + context.Parameters ??= []; + + var listItemsRaw = string.Empty; + var listItemsKey = context.Parameters.GetValueOrDefault(PARAM_LIST_ITEMS_KEY, string.Empty); + if (!string.IsNullOrWhiteSpace(listItemsKey)) + { + listItemsRaw = context.Parameters.GetValueOrDefault(listItemsKey, string.Empty); + } + else + { + listItemsRaw = context.Parameters.GetValueOrDefault(PARAM_LIST_ITEMS, string.Empty); + } + + if (string.IsNullOrWhiteSpace(listItemsRaw)) + { + _logger.LogInformation("Loop condition: list items are empty, loop completed (agent {AgentId}).", agent.Id); + CleanLoopState(context); + return new RuleNodeResult + { + Success = false, + Response = "Loop completed: list_items is empty." + }; + } + + // Deserialize list_items as a JSON array of any type + var items = JsonSerializer.Deserialize(listItemsRaw, new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); + + if (items.IsNullOrEmpty()) + { + _logger.LogInformation("Loop condition: no items to iterate, loop completed (agent {AgentId}).", agent.Id); + CleanLoopState(context); + return new RuleNodeResult + { + Success = false, + Response = "Loop completed: no items in list." + }; + } + + // If iterate_index is not yet set, this is the first visit after the action node + // already handled item[0], so start from index 1. + var indexStr = context.Parameters.GetValueOrDefault(PARAM_ITERATE_INDEX); + int currentIndex; + if (string.IsNullOrEmpty(indexStr)) + { + currentIndex = 0; + } + else if (!int.TryParse(indexStr, out currentIndex)) + { + currentIndex = 0; + } + + var nextIndex = currentIndex + 1; + if (currentIndex >= items!.Length || nextIndex >= items!.Length) + { + _logger.LogInformation("Loop condition: iteration complete ({Index}/{Total}) (agent {AgentId}).", + currentIndex, items.Length, agent.Id); + CleanLoopState(context); + return new RuleNodeResult + { + Success = false, + Response = $"Loop completed: iterated over all {items.Length} items." + }; + } + + // Set next item and advance index + var nextElement = items[nextIndex]; + var nextItem = nextElement.ConvertToString(); + context.Parameters[PARAM_ITERATE_NEXT_ITEM] = nextItem; + context.Parameters[PARAM_ITERATE_INDEX] = nextIndex.ToString(); + + var data = new Dictionary + { + [PARAM_ITERATE_NEXT_ITEM] = nextItem, + [PARAM_ITERATE_INDEX] = nextIndex.ToString() + }; + + + var itemKey = context.Parameters.GetValueOrDefault(PARAM_ITERATE_ITEM_KEY); + if (!string.IsNullOrEmpty(itemKey) + && nextElement.ValueKind == JsonValueKind.Object + && nextElement.TryGetProperty(itemKey, out var fieldValue)) + { + var fieldStr = fieldValue.ToString(); + context.Parameters[itemKey] = fieldStr; + data[itemKey] = fieldStr; + } + + _logger.LogInformation("Loop condition: processing item {Index}/{Total} = '{Item}' (agent {AgentId}).", + nextItem, items.Length, nextElement, agent.Id); + + return new RuleNodeResult + { + Success = true, + Response = $"Loop iteration {nextIndex}/{items.Length}: next item = {nextItem}", + Data = data + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error evaluating loop condition for agent {AgentId}", agent.Id); + CleanLoopState(context); + return new RuleNodeResult + { + Success = false, + ErrorMessage = ex.Message + }; + } + } + + private void CleanLoopState(RuleFlowContext context) + { + var itemKey = context.Parameters?.GetValueOrDefault(PARAM_ITERATE_ITEM_KEY); + + context.Parameters?.Remove(PARAM_LIST_ITEMS); + context.Parameters?.Remove(PARAM_ITERATE_INDEX); + context.Parameters?.Remove(PARAM_ITERATE_ITEM_KEY); + context.Parameters?.Remove(PARAM_ITERATE_NEXT_ITEM); + + if (!string.IsNullOrEmpty(itemKey)) + { + context.Parameters?.Remove(itemKey); + } + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs new file mode 100644 index 000000000..9598df345 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs @@ -0,0 +1,17 @@ +namespace BotSharp.Core.Rules.Constants; + +public static class RuleConstant +{ + public const int MAX_GRAPH_RECURSION = 50; + + public static IEnumerable CONDITION_NODE_TYPES = new List + { + "condition", + "criteria" + }; + + public static IEnumerable ACTION_NODE_TYPES = new List + { + "action" + }; +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs new file mode 100644 index 000000000..feb314ef7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs @@ -0,0 +1,42 @@ +using BotSharp.Core.Rules.Models; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Core.Rules.Controllers; + +[Authorize] +[ApiController] +public class RuleController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IRuleEngine _ruleEngine; + + public RuleController( + IServiceProvider services, + ILogger logger, + IRuleEngine ruleEngine) + { + _services = services; + _logger = logger; + _ruleEngine = ruleEngine; + } + + [HttpPost("/rule/trigger/action")] + public async Task RunAction([FromBody] RuleTriggerActionRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request cannnot be empty." }); + } + + var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName)); + if (trigger == null) + { + return BadRequest(new { Success = false, Error = "Unable to find rule trigger." }); + } + + var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options); + return Ok(new { Success = true }); + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs new file mode 100644 index 000000000..70a9eac22 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs @@ -0,0 +1,10 @@ +namespace BotSharp.Core.Rules; + +public class DemoRuleTrigger : IRuleTrigger +{ + public string Channel => "crontab"; + public string Name => nameof(DemoRuleTrigger); + + public string EntityType { get; set; } = "DemoType"; + public string EntityId { get; set; } = "DemoId"; +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs index 5f68b722d..8b5d846a9 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs @@ -1,36 +1,16 @@ -using BotSharp.Abstraction.Agents.Models; -using BotSharp.Abstraction.Coding; -using BotSharp.Abstraction.Coding.Contexts; -using BotSharp.Abstraction.Coding.Enums; -using BotSharp.Abstraction.Coding.Models; -using BotSharp.Abstraction.Coding.Settings; -using BotSharp.Abstraction.Coding.Utils; -using BotSharp.Abstraction.Conversations; -using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Models; -using BotSharp.Abstraction.Repositories.Filters; -using BotSharp.Abstraction.Rules.Options; -using BotSharp.Abstraction.Utilities; -using Microsoft.Extensions.Logging; -using System.Data; -using System.Text.Json; - namespace BotSharp.Core.Rules.Engines; public class RuleEngine : IRuleEngine { private readonly IServiceProvider _services; private readonly ILogger _logger; - private readonly CodingSettings _codingSettings; public RuleEngine( IServiceProvider services, - ILogger logger, - CodingSettings codingSettings) + ILogger logger) { _services = services; _logger = logger; - _codingSettings = codingSettings; } public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null) @@ -39,7 +19,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te // Pull all user defined rules var agentService = _services.GetRequiredService(); - var agents = await agentService.GetAgents(new AgentFilter + var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter { Pager = new Pagination { @@ -51,154 +31,623 @@ public async Task> Triggered(IRuleTrigger trigger, string te var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList(); foreach (var agent in filteredAgents) { - // Code trigger - if (options != null) + var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled); + if (rule == null) { - var isTriggered = await TriggerCodeScript(agent, trigger.Name, options); - if (!isTriggered) + continue; + } + + var ruleConfig = rule.Config; + var ruleFlowTopologyName = options?.Flow?.TopologyName ?? ruleConfig?.TopologyName; + + if (!string.IsNullOrEmpty(ruleFlowTopologyName)) + { + // Execute graph + // 1. Load graph + var graph = await LoadGraph(ruleFlowTopologyName, agent, trigger, options?.Flow); + if (graph == null) { continue; } - } - var convService = _services.GetRequiredService(); - var conv = await convService.NewConversation(new Conversation + // 2. Get root node + var param = options?.Flow?.Parameters; + var rootNodeName = param != null ? param.GetValueOrDefault("root_node_name")?.ToString() : null; + var root = graph.GetRootNode(rootNodeName); + if (root == null) + { + graph.Clear(); + continue; + } + + // 3. Execute graph + var execResults = new List(); + await ExecuteGraphNode(root, graph, agent, trigger, text, states, null, options, execResults); + graph.Clear(); + + // Get conversation id to support legacy features + var convIds = execResults.Where(x => x.Success && x.Data.TryGetValue("conversation_id", out _)) + .Select(x => x.Data.GetValueOrDefault("conversation_id", string.Empty)) + .Where(x => !string.IsNullOrEmpty(x)) + .ToList(); + + newConversationIds.AddRange(convIds); + } + else { - Channel = trigger.Channel, - Title = text, - AgentId = agent.Id - }); + var convId = await SendMessageToAgent(agent, trigger, text, states); + newConversationIds.Add(convId); + } + } + + return newConversationIds; + } + + public async Task ExecuteGraphNode(RuleNode node, RuleGraph graph, string agentId, IRuleTrigger trigger, RuleNodeExecutionOptions options) + { + if (node == null || graph == null || options == null) + { + return; + } + + var agentService = _services.GetRequiredService(); + var agent = await agentService.GetAgent(agentId); + + var triggerOptions = new RuleTriggerOptions + { + Flow = options.Flow, + JsonOptions = options.JsonOptions + }; - var message = new RoleDialogModel(AgentRole.User, text); + var execResults = new List(); + await ExecuteGraphNode( + node, graph, + agent, trigger, + options.Text, + options.States, + null, + triggerOptions, + execResults); + graph.Clear(); + } - var allStates = new List + #region Graph + private async Task LoadGraph(string name, Agent agent, IRuleTrigger trigger, RuleFlowOptions? options) + { + var flow = _services.GetServices>().FirstOrDefault(x => x.Name.IsEqualTo(name)); + if (flow == null) + { + return null; + } + + try + { + var config = await flow.GetTopologyConfigAsync(options: new() { - new("channel", trigger.Channel) - }; + TopologyName = name + }); - if (states != null) + var topologyId = config?.TopologyId; + if (string.IsNullOrEmpty(topologyId)) { - allStates.AddRange(states); + return null; } - await convService.SetConversationId(conv.Id, allStates); - await convService.SendMessage(agent.Id, - message, - null, - msg => Task.CompletedTask); + var param = new Dictionary(options?.Parameters ?? []); + param["agent"] = param.GetValueOrDefault("agent", agent.Name); + param["agent_id"] = param.GetValueOrDefault("agent_id", agent.Id); + param["trigger"] = param.GetValueOrDefault("trigger", trigger.Name); - await convService.SaveStates(); - newConversationIds.Add(conv.Id); + return await flow.GetTopologyAsync(topologyId, options: new() + { + Query = options?.Query, + Parameters = param + }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when loading graph (name: {name}, agent: {agent}, trigger: {trigger?.Name})"); + return null; } + } - return newConversationIds; + private async Task ExecuteGraphNode( + RuleNode node, + RuleGraph graph, + Agent agent, + IRuleTrigger trigger, + string text, + IEnumerable? states, + Dictionary? data, + RuleTriggerOptions? options, + List results) + { + try + { + if (options?.Flow?.TraversalAlgorithm?.IsEqualTo("bfs") == true) + { + await ExecuteGraphNodeBfs(node, graph, agent, trigger, text, states, data, options, results); + } + else + { + await ExecuteGraphNodeDfs(node, graph, agent, trigger, text, states, data, options, results); + } + } + catch { } } - #region Private methods - private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options) + private async Task ExecuteGraphNodeDfs( + RuleNode node, + RuleGraph graph, + Agent agent, + IRuleTrigger trigger, + string text, + IEnumerable? states, + Dictionary? data, + RuleTriggerOptions? options, + List results) { - if (string.IsNullOrWhiteSpace(agent?.Id)) + // Check whether the action nodes have been visited more than limit + var visited = results.Count(); + var param = options?.Flow?.Parameters ?? []; + var maxRecursion = int.TryParse(param.GetValueOrDefault("max_recursion")?.ToString(), out var depth) && depth > 0 + ? depth : RuleConstant.MAX_GRAPH_RECURSION; + + var innerData = new Dictionary(data ?? []); + + if (visited >= maxRecursion) { - return false; + _logger.LogWarning("Exceed max graph recursion {MaxRecursion} (agent {Agent} and trigger {Trigger}).", + maxRecursion, agent.Name, trigger.Name); + return; } - var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter; - var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider)); - if (processor == null) + // Get current node successors + var nextNodes = graph.GetChildrenNodes(node); + if (nextNodes.IsNullOrEmpty()) { - _logger.LogWarning($"Unable to find code processor: {provider}."); - return false; + return; } - var agentService = _services.GetRequiredService(); - var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py"; - var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src); + // Visit neighbor nodes + foreach (var (nextNode, edge) in nextNodes) + { + // Build context + var context = new RuleFlowContext + { + Node = nextNode, + Edge = edge, + Graph = graph, + Text = text, + Parameters = BuildParameters(nextNode.Config, states, innerData), + PrevStepResults = results, + JsonOptions = options?.JsonOptions + }; + + if (RuleConstant.CONDITION_NODE_TYPES.Contains(nextNode.Type)) + { + // Execute condition node + var conditionResult = await ExecuteCondition(nextNode, graph, agent, trigger, context); + if (conditionResult == null) + { + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = false, + ErrorMessage = $"Unable to find condition {nextNode.Name}." + }, nextNode)); + continue; + } + + results.Add(RuleFlowStepResult.FromResult(conditionResult, nextNode)); + + // If condition result is true, then execute the next node, otherwise skip + if (conditionResult.Success) + { + await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results); + } + else + { + _logger.LogInformation("Condition {ConditionName} evaluated to false, skipping next node (agent {Agent} and trigger {Trigger}).", + nextNode.Name, agent.Name, trigger.Name); + } + } + else if (RuleConstant.ACTION_NODE_TYPES.Contains(nextNode.Type)) + { + // Execute action node + var actionResult = await ExecuteAction(nextNode, graph, agent, trigger, context); + if (actionResult == null) + { + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = false, + ErrorMessage = $"Unable to find action {nextNode.Name}." + }, nextNode)); + continue; + } + + results.Add(RuleFlowStepResult.FromResult(actionResult, nextNode)); - var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}."; + if (actionResult.IsDelayed) + { + continue; + } - if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content)) + await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results); + } + else + { + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = true, + Response = $"Pass through node {nextNode.Name}." + }, nextNode)); + await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results); + } + } + } + + private async Task ExecuteGraphNodeBfs( + RuleNode root, + RuleGraph graph, + Agent agent, + IRuleTrigger trigger, + string text, + IEnumerable? states, + Dictionary? data, + RuleTriggerOptions? options, + List results) + { + var param = options?.Flow?.Parameters ?? []; + var maxRecursion = int.TryParse(param.GetValueOrDefault("max_recursion")?.ToString(), out var depth) && depth > 0 + ? depth : RuleConstant.MAX_GRAPH_RECURSION; + + var innerData = new Dictionary(data ?? []); + + // Each queue entry is (node-to-process, edge-that-leads-to-it) + var queue = new Queue<(RuleNode Node, RuleEdge Edge)>(); + + foreach (var (childNode, edge) in graph.GetChildrenNodes(root)) { - _logger.LogWarning($"Unable to find {msg}."); - return false; + queue.Enqueue((childNode, edge)); } - try + while (queue.Count > 0) { - var hooks = _services.GetHooks(agent.Id); + if (results.Count >= maxRecursion) + { + _logger.LogWarning("Exceed max graph nodes {MaxNodes} during BFS (agent {Agent} and trigger {Trigger}).", + maxRecursion, agent.Name, trigger.Name); + break; + } - var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent); - var context = new CodeExecutionContext + var (nextNode, nextEdge) = queue.Dequeue(); + + var context = new RuleFlowContext { - CodeScript = codeScript, - Arguments = arguments + Node = nextNode, + Edge = nextEdge, + Graph = graph, + Text = text, + Parameters = BuildParameters(nextNode.Config, states, innerData), + PrevStepResults = results, + JsonOptions = options?.JsonOptions }; - foreach (var hook in hooks) + if (RuleConstant.CONDITION_NODE_TYPES.Contains(nextNode.Type)) + { + // Execute condition node + var conditionResult = await ExecuteCondition(nextNode, graph, agent, trigger, context); + innerData = new(context.Parameters ?? []); + + if (conditionResult == null) + { + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = false, + ErrorMessage = $"Unable to find condition {nextNode.Name}." + }, nextNode)); + continue; + } + + results.Add(RuleFlowStepResult.FromResult(conditionResult, nextNode)); + + // If condition is true, enqueue children; otherwise skip the branch + if (conditionResult.Success) + { + foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode)) + { + queue.Enqueue((childNode, childEdge)); + } + } + else + { + _logger.LogInformation("Condition {ConditionName} evaluated to false, skipping next node (agent {Agent} and trigger {Trigger}).", + nextNode.Name, agent.Name, trigger.Name); + } + } + else if (RuleConstant.ACTION_NODE_TYPES.Contains(nextNode.Type)) + { + // Execute action node + var actionResult = await ExecuteAction(nextNode, graph, agent, trigger, context); + innerData = new(context.Parameters ?? []); + + if (actionResult == null) + { + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = false, + ErrorMessage = $"Unable to find action {nextNode.Name}." + }, nextNode)); + continue; + } + + results.Add(RuleFlowStepResult.FromResult(actionResult, nextNode)); + + if (!actionResult.IsDelayed) + { + foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode)) + { + queue.Enqueue((childNode, childEdge)); + } + } + } + else { - await hook.BeforeCodeExecution(agent, context); + results.Add(RuleFlowStepResult.FromResult(new() + { + Success = true, + Response = $"Pass through node {nextNode.Name}." + }, nextNode)); + + foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode)) + { + queue.Enqueue((childNode, childEdge)); + } } + } + } + #endregion - var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); - var response = processor.Run(codeScript.Content, options: new() + + #region Action + private async Task ExecuteAction( + RuleNode node, + RuleGraph graph, + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + try + { + // Find the matching action + var foundAction = GetRuleAction(node, agent, trigger); + if (foundAction == null) { - ScriptName = scriptName, - Arguments = arguments, - UseLock = useLock, - UseProcess = useProcess - }, cancellationToken: cts.Token); + var errorMsg = $"No rule action {node?.Name} is found"; + _logger.LogWarning(errorMsg); + return null; + } - var codeResponse = new CodeExecutionResponseModel + _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}", + foundAction.Name, agent.Id, trigger.Name); + + var hooks = _services.GetHooks(agent.Id); + foreach (var hook in hooks) { - CodeProcessor = processor.Provider, - CodeScript = codeScript, - Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty), - ExecutionResult = response - }; + await hook.BeforeRuleActionExecuting(agent, node, trigger, context); + } + + // Execute action + context.Parameters ??= []; + var result = await foundAction.ExecuteAsync(agent, trigger, context); foreach (var hook in hooks) { - await hook.AfterCodeExecution(agent, codeResponse); + await hook.AfterRuleActionExecuted(agent, node, trigger, context, result); } - if (response == null || !response.Success) + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", node?.Name, agent.Id); + return new RuleNodeResult + { + Success = false, + ErrorMessage = ex.Message + }; + } + } + + // Find the matching action + private IRuleAction? GetRuleAction(RuleNode node, Agent agent, IRuleTrigger trigger) + { + var actions = _services.GetServices() + .Where(x => x.Name.IsEqualTo(node?.Name)) + .ToList(); + + var found = actions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id) && x.Triggers?.Contains(trigger.Name) == true); + if (found != null) + { + return found; + } + + found = actions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id)); + if (found != null) + { + return found; + } + + found = actions.FirstOrDefault(x => x.Triggers?.Contains(trigger.Name, StringComparer.OrdinalIgnoreCase) == true); + if (found != null) + { + return found; + } + + found = actions.FirstOrDefault(); + if (found != null) + { + return found; + } + + return null; + } + #endregion + + + #region Condition + private async Task ExecuteCondition( + RuleNode node, + RuleGraph graph, + Agent agent, + IRuleTrigger trigger, + RuleFlowContext context) + { + try + { + // Find the matching condition + var foundCondition = GetRuleCondition(node, agent, trigger); + if (foundCondition == null) { - _logger.LogWarning($"Failed to handle {msg}"); - return false; + var errorMsg = $"No rule condition {node?.Name} is found"; + _logger.LogWarning(errorMsg); + return null; } - bool result; - LogLevel logLevel; - if (response.Result.IsEqualTo("true")) + _logger.LogInformation("Start execution rule condition {ConditionName} for agent {AgentId} with trigger {TriggerName}", + foundCondition.Name, agent.Id, trigger.Name); + + var hooks = _services.GetHooks(agent.Id); + foreach (var hook in hooks) { - logLevel = LogLevel.Information; - result = true; + await hook.BeforeRuleConditionExecuting(agent, node, trigger, context); } - else + + // Execute condition + context.Parameters ??= []; + var result = await foundCondition.EvaluateAsync(agent, trigger, context); + + foreach (var hook in hooks) { - logLevel = LogLevel.Warning; - result = false; + await hook.AfterRuleConditionExecuted(agent, node, trigger, context, result); } - _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}"); return result; } catch (Exception ex) { - _logger.LogError(ex, $"Error when handling {msg}"); - return false; + _logger.LogError(ex, "Error executing rule condition {ConditionName} for agent {AgentId}", node?.Name, agent.Id); + return new RuleNodeResult + { + Success = false, + ErrorMessage = ex.Message + }; + } + } + + // Find the matching condition + private IRuleCondition? GetRuleCondition(RuleNode node, Agent agent, IRuleTrigger trigger) + { + var conditions = _services.GetServices() + .Where(x => x.Name.IsEqualTo(node?.Name)) + .ToList(); + + var found = conditions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id) && x.Triggers?.Contains(trigger.Name) == true); + if (found != null) + { + return found; } + + found = conditions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id)); + if (found != null) + { + return found; + } + + found = conditions.FirstOrDefault(x => x.Triggers?.Contains(trigger.Name, StringComparer.OrdinalIgnoreCase) == true); + if (found != null) + { + return found; + } + + found = conditions.FirstOrDefault(); + if (found != null) + { + return found; + } + + return null; } + #endregion - private List BuildArguments(string? name, JsonDocument? args) + + #region Private methods + private Dictionary BuildParameters( + Dictionary? config, + IEnumerable? states, + Dictionary? param = null) { - var keyValues = new List(); - if (args != null) + var dict = new Dictionary(); + + if (config != null) { - keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText())); + dict = new(config); } - return keyValues; + + if (!states.IsNullOrEmpty()) + { + foreach (var state in states!) + { + dict[state.Key] = state.Value?.ConvertToString(); + } + } + + if (!param.IsNullOrEmpty()) + { + foreach (var pair in param!) + { + dict[pair.Key] = pair.Value; + } + } + + return dict; + } + #endregion + + + #region Legacy conversation + private async Task SendMessageToAgent(Agent agent, IRuleTrigger trigger, string text, IEnumerable? states = null) + { + var convService = _services.GetRequiredService(); + var conv = await convService.NewConversation(new Conversation + { + Channel = trigger.Channel, + Title = text, + AgentId = agent.Id + }); + + var message = new RoleDialogModel(AgentRole.User, text); + + var allStates = new List + { + new("channel", trigger.Channel) + }; + + if (!states.IsNullOrEmpty()) + { + allStates.AddRange(states!); + } + + await convService.SetConversationId(conv.Id, allStates); + await convService.SendMessage(agent.Id, + message, + null, + msg => Task.CompletedTask); + + await convService.SaveStates(); + + return conv.Id; } #endregion } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs new file mode 100644 index 000000000..1c5e81d71 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs @@ -0,0 +1,11 @@ +namespace BotSharp.Core.Rules.Models; + +public class RuleMessagePayload +{ + public string AgentId { get; set; } + public string TriggerName { get; set; } + public string Channel { get; set; } + public string Text { get; set; } + public Dictionary States { get; set; } + public DateTime Timestamp { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs new file mode 100644 index 000000000..0abea08b0 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace BotSharp.Core.Rules.Models; + +public class RuleTriggerActionRequest +{ + [JsonPropertyName("trigger_name")] + public string TriggerName { get; set; } + + [JsonPropertyName("text")] + public string Text { get; set; } = string.Empty; + + [JsonPropertyName("states")] + public IEnumerable? States { get; set; } + + [JsonPropertyName("options")] + public RuleTriggerOptions? Options { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs index 56e1fb8ae..ace6553ae 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs @@ -1,3 +1,5 @@ +using BotSharp.Core.Rules.Actions; +using BotSharp.Core.Rules.Conditions; using BotSharp.Core.Rules.Engines; namespace BotSharp.Core.Rules; @@ -16,6 +18,21 @@ public class RulesPlugin : IBotSharpPlugin public void RegisterDI(IServiceCollection services, IConfiguration config) { + // Register rule engine services.AddScoped(); + + // Register rule actions + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + +#if DEBUG + // Register rule conditions + services.AddScoped(); + services.AddScoped(); + + // Register rule trigger + services.AddScoped(); +#endif } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs index a4353c960..2d1dc6844 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs @@ -1,5 +1,7 @@ global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using System.Text; global using BotSharp.Abstraction.Agents.Enums; global using BotSharp.Abstraction.Plugins; @@ -8,4 +10,24 @@ global using BotSharp.Abstraction.Instructs; global using BotSharp.Abstraction.Instructs.Models; -global using BotSharp.Abstraction.Rules; \ No newline at end of file +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Conversations; + +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Repositories.Filters; +global using BotSharp.Abstraction.Rules; +global using BotSharp.Abstraction.Rules.Options; +global using BotSharp.Abstraction.Rules.Models; +global using BotSharp.Abstraction.Rules.Hooks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Coding; +global using BotSharp.Abstraction.Coding.Contexts; +global using BotSharp.Abstraction.Coding.Enums; +global using BotSharp.Abstraction.Coding.Models; +global using BotSharp.Abstraction.Coding.Utils; +global using BotSharp.Abstraction.Coding.Settings; +global using BotSharp.Abstraction.Hooks; + +global using BotSharp.Core.Rules.Constants; \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs index f0ec11c23..6995b49ec 100644 --- a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs +++ b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs @@ -195,7 +195,7 @@ public async Task Execute( foreach (var hook in hooks) { await hook.AfterCompletion(agent, instructResult); - await hook.AfterCodeExecution(agent, codeExecution); + await hook.AfterCodeExecution(agent, context, codeExecution); } return instructResult; diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs new file mode 100644 index 000000000..5c84fcb63 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs @@ -0,0 +1,18 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Core.Messaging; + +public class MessagingPlugin : IBotSharpPlugin +{ + public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b"; + public string Name => "Messaging"; + public string Description => "Provides message queue services."; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + services.AddSingleton(mqSettings); + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json index e958bcb31..f09d98554 100644 --- a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json +++ b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json @@ -14,5 +14,10 @@ "model": "gpt-5-mini", "max_recursion_depth": 3, "reasoning_effort_level": "minimal" - } + }, + "rules": [ + { + "trigger_name": "DemoRuleTrigger" + } + ] } \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs index 6ef8c5935..117feaebe 100644 --- a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs +++ b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs @@ -1,9 +1,11 @@ +using BotSharp.Abstraction.Coding.Contexts; using BotSharp.Abstraction.Coding.Models; using BotSharp.Abstraction.Instructs.Models; using BotSharp.Abstraction.Instructs.Settings; using BotSharp.Abstraction.Loggers.Models; using BotSharp.Abstraction.Users; using BotSharp.Abstraction.Utilities; +using System; namespace BotSharp.Logger.Hooks; @@ -61,7 +63,7 @@ await db.SaveInstructionLogs(new List await base.OnResponseGenerated(response); } - public override async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) + public override async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) { if (response == null || !IsLoggingEnabled(agent?.Id)) { @@ -88,7 +90,7 @@ await db.SaveInstructionLogs(new List } }); - await base.AfterCodeExecution(agent, response); + await base.AfterCodeExecution(agent, context, response); } private bool IsLoggingEnabled(string? agentId) diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs index 52fd719fd..43cf228c4 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs @@ -1,5 +1,5 @@ -using BotSharp.Abstraction.Agents.Models; using BotSharp.Abstraction.Rules; +using BotSharp.Abstraction.Rules.Models; namespace BotSharp.OpenAPI.Controllers; @@ -18,9 +18,22 @@ public IEnumerable GetRuleTriggers() }).OrderBy(x => x.TriggerName); } - [HttpGet("/rule/formalization")] - public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule) + [HttpGet("/rule/config/options")] + public async Task> GetRuleConfigOptions() { - return "{}"; + var dict = new Dictionary(); + var flows = _services.GetServices>(); + + foreach (var flow in flows) + { + var config = await flow.GetTopologyConfigAsync(); + if (string.IsNullOrEmpty(config.TopologyName)) + { + continue; + } + dict[config.TopologyName] = config; + } + + return dict; } } diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs index be189898e..8e29bb1bb 100644 --- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs +++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs @@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r } catch (Exception ex) { - _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url})."); + _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url})."); return result; } } diff --git a/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs b/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs index dbc7393a8..52f3968dd 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Graph; +using BotSharp.Plugin.Membase.Interfaces; using Microsoft.AspNetCore.Http; namespace BotSharp.Plugin.Membase.Controllers; @@ -8,11 +9,46 @@ namespace BotSharp.Plugin.Membase.Controllers; public class MembaseController : ControllerBase { private readonly IServiceProvider _services; + private readonly IMembaseApi _membaseApi; public MembaseController( - IServiceProvider services) + IServiceProvider services, + IMembaseApi membaseApi) { _services = services; + _membaseApi = membaseApi; + } + + /// + /// Get graph information + /// + /// The graph identifier + /// Graph information +#if DEBUG + [AllowAnonymous] +#endif + [HttpGet("/membase/{graphId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task GetGraphInfo(string graphId) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + try + { + var graphInfo = await _membaseApi.GetGraphInfoAsync(graphId); + return Ok(graphInfo); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while retrieving graph information.", error = ex.Message }); + } } /// @@ -21,6 +57,9 @@ public MembaseController( /// The graph identifier /// The Cypher query request containing the query and parameters /// Query result with columns and data +#if DEBUG + [AllowAnonymous] +#endif [HttpPost("/membase/{graphId}/query")] [ProducesResponseType(StatusCodes.Status200OK)] [ProducesResponseType(StatusCodes.Status400BadRequest)] @@ -58,4 +97,323 @@ public async Task ExecuteGraphQuery(string graphId, [FromBody] Cy new { message = "An error occurred while executing the query.", error = ex.Message }); } } + + /// + /// Get a node from the graph + /// + /// The graph identifier + /// The node identifier + /// The node +#if DEBUG + [AllowAnonymous] +#endif + [HttpGet("/membase/{graphId}/node/{nodeId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task GetNode(string graphId, string nodeId) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(nodeId)) + { + return BadRequest("Node ID cannot be empty."); + } + + try + { + var node = await _membaseApi.GetNodeAsync(graphId, nodeId); + return Ok(node); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while retrieving the node.", error = ex.Message }); + } + } + + /// + /// Create a node in the graph + /// + /// The graph identifier + /// The node creation model + /// The created node +#if DEBUG + [AllowAnonymous] +#endif + [HttpPost("/membase/{graphId}/node")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task CreateNode(string graphId, [FromBody] NodeCreationModel request) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (request == null) + { + return BadRequest("Node creation model cannot be null."); + } + + try + { + var node = await _membaseApi.CreateNodeAsync(graphId, request); + return Ok(node); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while creating the node.", error = ex.Message }); + } + } + + /// + /// Merge a node in the graph + /// + /// The graph identifier + /// The node update model + /// The merged node +#if DEBUG + [AllowAnonymous] +#endif + [HttpPut("/membase/{graphId}/node/merge")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task MergeNode(string graphId, [FromBody] NodeUpdateModel request) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(request?.Id)) + { + return BadRequest("Node ID cannot be empty."); + } + + try + { + var node = await _membaseApi.MergeNodeAsync(graphId, request.Id, request); + return Ok(node); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while merging the node.", error = ex.Message }); + } + } + + /// + /// Delete a node from the graph + /// + /// The graph identifier + /// The node identifier + /// Delete response +#if DEBUG + [AllowAnonymous] +#endif + [HttpDelete("/membase/{graphId}/node/{nodeId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task DeleteNode(string graphId, string nodeId) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(nodeId)) + { + return BadRequest("Node ID cannot be empty."); + } + + try + { + await _membaseApi.DeleteNodeAsync(graphId, nodeId); + return Ok("done"); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while deleting the node.", error = ex.Message }); + } + } + + /// + /// Get an edge from the graph + /// + /// The graph identifier + /// The edge identifier + /// The edge +#if DEBUG + [AllowAnonymous] +#endif + [HttpGet("/membase/{graphId}/edge/{edgeId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task GetEdge(string graphId, string edgeId) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(edgeId)) + { + return BadRequest("Edge ID cannot be empty."); + } + + try + { + var edge = await _membaseApi.GetEdgeAsync(graphId, edgeId); + return Ok(edge); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while retrieving the edge.", error = ex.Message }); + } + } + + /// + /// Create an edge in the graph + /// + /// The graph identifier + /// The edge creation model + /// The created edge +#if DEBUG + [AllowAnonymous] +#endif + [HttpPost("/membase/{graphId}/edge")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task CreateEdge(string graphId, [FromBody] EdgeCreationModel request) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (request == null) + { + return BadRequest("Edge creation model cannot be null."); + } + + if (string.IsNullOrWhiteSpace(request.SourceNodeId)) + { + return BadRequest("Source node ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(request.TargetNodeId)) + { + return BadRequest("Target node ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(request.Type)) + { + return BadRequest("Edge type cannot be empty."); + } + + try + { + var edge = await _membaseApi.CreateEdgeAsync(graphId, request); + return Ok(edge); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while creating the edge.", error = ex.Message }); + } + } + + /// + /// Update an edge in the graph + /// + /// The graph identifier + /// The edge update model + /// The updated edge +#if DEBUG + [AllowAnonymous] +#endif + [HttpPut("/membase/{graphId}/edge")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task UpdateEdge(string graphId, [FromBody] EdgeUpdateModel request) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(request?.Id)) + { + return BadRequest("Edge ID cannot be empty."); + } + + try + { + var edge = await _membaseApi.UpdateEdgeAsync(graphId, request.Id, request); + return Ok(edge); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while updating the edge.", error = ex.Message }); + } + } + + /// + /// Delete an edge from the graph + /// + /// The graph identifier + /// The edge identifier + /// Delete response +#if DEBUG + [AllowAnonymous] +#endif + [HttpDelete("/membase/{graphId}/edge/{edgeId}")] + [ProducesResponseType(StatusCodes.Status200OK)] + [ProducesResponseType(StatusCodes.Status400BadRequest)] + [ProducesResponseType(StatusCodes.Status500InternalServerError)] + public async Task DeleteEdge(string graphId, string edgeId) + { + if (string.IsNullOrWhiteSpace(graphId)) + { + return BadRequest("Graph ID cannot be empty."); + } + + if (string.IsNullOrWhiteSpace(edgeId)) + { + return BadRequest("Edge ID cannot be empty."); + } + + try + { + await _membaseApi.DeleteEdgeAsync(graphId, edgeId); + return Ok("done"); + } + catch (Exception ex) + { + return StatusCode( + StatusCodes.Status500InternalServerError, + new { message = "An error occurred while deleting the edge.", error = ex.Message }); + } + } } diff --git a/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs b/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs index 1bcca821d..30209eba5 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs @@ -4,6 +4,7 @@ using BotSharp.Abstraction.Models; using BotSharp.Abstraction.Options; using BotSharp.Abstraction.Utilities; +using BotSharp.Plugin.Membase.Interfaces; using Microsoft.Extensions.Logging; using System.Text.Json; @@ -38,7 +39,7 @@ public async Task ExecuteQueryAsync(string query, GraphQueryEx try { - var response = await _membaseApi.CypherQueryAsync(options.GraphId, new CypherQueryRequest + var response = await _membaseApi.CypherQueryAsync(options!.GraphId, new CypherQueryRequest { Query = query, Parameters = args diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs b/src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs similarity index 95% rename from src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs rename to src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs index 7dea503c8..c7b511f59 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs @@ -2,7 +2,7 @@ using System.Net.Http; using System.Threading; -namespace BotSharp.Plugin.Membase.Services; +namespace BotSharp.Plugin.Membase.Handlers; public class MembaseAuthHandler : DelegatingHandler { diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs b/src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs similarity index 87% rename from src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs rename to src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs index 85c0b3b17..1c294060a 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs @@ -1,7 +1,7 @@ using BotSharp.Plugin.Membase.Models.Graph; using Refit; -namespace BotSharp.Plugin.Membase.Services; +namespace BotSharp.Plugin.Membase.Interfaces; /// /// Membase REST API interface @@ -29,7 +29,7 @@ public interface IMembaseApi Task MergeNodeAsync(string graphId, string nodeId, [Body] NodeUpdateModel node); [Delete("/graph/{graphId}/node/{nodeId}")] - Task DeleteNodeAsync(string graphId, string nodeId); + Task DeleteNodeAsync(string graphId, string nodeId); #endregion #region Edge @@ -43,6 +43,6 @@ public interface IMembaseApi Task UpdateEdgeAsync(string graphId, string edgeId, [Body] EdgeUpdateModel edge); [Delete("/graph/{graphId}/edge/{edgeId}")] - Task DeleteEdgeAsync(string graphId, string edgeId); + Task DeleteEdgeAsync(string graphId, string edgeId); #endregion } diff --git a/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs b/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs index 32e06d2c7..87c6156db 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs @@ -1,6 +1,9 @@ using BotSharp.Abstraction.Graph; using BotSharp.Abstraction.Plugins.Models; +using BotSharp.Abstraction.Rules; using BotSharp.Plugin.Membase.GraphDb; +using BotSharp.Plugin.Membase.Handlers; +using BotSharp.Plugin.Membase.Interfaces; using Refit; namespace BotSharp.Plugin.Membase; @@ -37,6 +40,10 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) _membaseCredential = config.GetValue("Membase:ApiKey") ?? string.Empty; _membaseProjectId = config.GetValue("Membase:ProjectId") ?? string.Empty; + +#if DEBUG + services.AddScoped, DemoRuleGraph>(); +#endif } public bool AttachMenu(List menu) diff --git a/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs b/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs index 9cc80123f..c660f485d 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs @@ -2,5 +2,6 @@ namespace BotSharp.Plugin.Membase.Models; public class EdgeUpdateModel { + public string? Id { get; set; } public object? Properties { get; set; } } diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs b/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs new file mode 100644 index 000000000..105260448 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs @@ -0,0 +1,322 @@ +using BotSharp.Abstraction.Graph; +using BotSharp.Abstraction.Graph.Models; +using BotSharp.Abstraction.Rules; +using BotSharp.Abstraction.Rules.Models; +using BotSharp.Abstraction.Rules.Options; +using BotSharp.Abstraction.Utilities; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +namespace BotSharp.Plugin.Membase.Services; + +public class DemoRuleGraph : IRuleFlow +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public DemoRuleGraph( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "One Flow"; + + public async Task GetTopologyConfigAsync(RuleFlowConfigOptions? options = null) + { + var settings = _services.GetRequiredService(); + var apiKey = settings.ApiKey; + var projectId = settings.ProjectId; + + var topologyName = Name; + if (!string.IsNullOrEmpty(options?.TopologyName)) + { + topologyName = options.TopologyName; + } + + var foundInstance = settings.GraphInstances?.FirstOrDefault(x => x.Name.IsEqualTo(topologyName)); + var graphId = foundInstance?.Id ?? string.Empty; + var query = Uri.EscapeDataString("MATCH (a)-[r]->(b) WITH a, r, b WHERE a.agent = $agent AND a.trigger = $trigger AND b.agent = $agent AND b.trigger = $trigger RETURN a, r, b LIMIT 100"); + + return new RuleConfigModel + { + TopologyId = graphId, + TopologyName = foundInstance?.Name, + CustomParameters = JsonDocument.Parse(JsonSerializer.Serialize(new + { + htmlTag = "iframe", + appendParameterName = "parameters", + url = $"https://console.membase.dev/query-editor/{projectId}?graphId={graphId}&query={query}&token={apiKey}" + })) + }; + } + + public async Task GetTopologyAsync(string id, RuleFlowLoadOptions? options = null) + { + if (string.IsNullOrEmpty(id)) + { +#if DEBUG + return GetDefaultGraph(); +#else + return null; +#endif + } + + var query = options?.Query ?? string.Empty; + if (string.IsNullOrEmpty(query)) + { + query = $""" + MATCH (a)-[r]->(b) + WITH a, r, b + WHERE a.agent = $agent AND a.trigger = $trigger AND b.agent = $agent AND b.trigger = $trigger + RETURN a, r, b + LIMIT 100 + """; + } + + var args = new Dictionary(); + if (options?.Parameters != null) + { + foreach (var param in options.Parameters!) + { + if (param.Key == null || param.Value == null) + { + continue; + } + args[param.Key] = param.Value; + } + } + + try + { + var graphDb = _services.GetServices().First(x => x.Provider.IsEqualTo("membase")); + var result = await graphDb.ExecuteQueryAsync(query, options: new() + { + GraphId = id, + Arguments = args + }); + + if (result == null) + { + return null; + } + + var graph = BuildGraph(result); + return graph; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error when loading graph (id: {GraphId})", id); + return null; + } + } + + private RuleGraph BuildGraph(GraphQueryResult result) + { + var graph = RuleGraph.Init(); + if (result.Values.IsNullOrEmpty()) + { + return graph; + } + + foreach (var item in result.Values) + { + // Try to deserialize nodes and edge from the dictionary + if (!item.TryGetValue("a", out var sourceNodeElement) || + !item.TryGetValue("b", out var targetNodeElement) || + !item.TryGetValue("r", out var edgeElement)) + { + continue; + } + + // Parse source node + var sourceNodeId = sourceNodeElement.GetProperty("id").GetString(); + var sourceNodeLabels = sourceNodeElement.TryGetProperty("labels", out var sLabels) + ? sLabels.EnumerateArray().Select(x => x.GetString() ?? "").ToList() + : []; + var sourceNodeProps = sourceNodeElement.TryGetProperty("properties", out var sProps) + ? sProps + : default; + var sourceNodeWeight = sourceNodeElement.TryGetProperty("weight", out var sNodeWeight) && sNodeWeight.ValueKind == JsonValueKind.Number + ? sNodeWeight.GetDouble() + : 1.0; + + // Parse target node + var targetNodeId = targetNodeElement.GetProperty("id").GetString(); + var targetNodeLabels = targetNodeElement.TryGetProperty("labels", out var tLabels) + ? tLabels.EnumerateArray().Select(x => x.GetString() ?? "").ToList() + : []; + var targetNodeProps = targetNodeElement.TryGetProperty("properties", out var tProps) + ? tProps + : default; + var targetNodeWeight = targetNodeElement.TryGetProperty("weight", out var tNodeWeight) && tNodeWeight.ValueKind == JsonValueKind.Number + ? tNodeWeight.GetDouble() + : 1.0; + + // Parse edge + var edgeId = edgeElement.GetProperty("id").GetString(); + var edgeProps = edgeElement.TryGetProperty("properties", out var eProps) + ? eProps + : default; + var edgeWeight = edgeElement.TryGetProperty("weight", out var eWeight) && eWeight.ValueKind == JsonValueKind.Number + ? eWeight.GetDouble() + : 1.0; + + // Create source node + var sourceNode = new RuleNode() + { + Id = sourceNodeId ?? Guid.NewGuid().ToString(), + Labels = sourceNodeLabels, + Weight = sourceNodeWeight, + Name = GetGraphItemAttribute(sourceNodeProps, key: "name", defaultValue: "node"), + Type = GetGraphItemAttribute(sourceNodeProps, key: "type", defaultValue: "action"), + Purpose = GetGraphItemAttribute(sourceNodeProps, key: "purpose", defaultValue: "unknown"), + Config = GetConfig(sourceNodeProps) + }; + + // Create target node + var targetNode = new RuleNode() + { + Id = targetNodeId ?? Guid.NewGuid().ToString(), + Labels = targetNodeLabels, + Weight = targetNodeWeight, + Name = GetGraphItemAttribute(targetNodeProps, key: "name", defaultValue: "node"), + Type = GetGraphItemAttribute(targetNodeProps, key: "type", defaultValue: "action"), + Purpose = GetGraphItemAttribute(targetNodeProps, key: "purpose", defaultValue: "unknown"), + Config = GetConfig(targetNodeProps) + }; + + // Create edge payload + var edgePayload = new GraphItemPayload() + { + Id = edgeId ?? Guid.NewGuid().ToString(), + Name = GetGraphItemAttribute(edgeProps, key: "name", defaultValue: "edge"), + Type = GetGraphItemAttribute(edgeProps, key: "type", defaultValue: "next"), + Purpose = GetGraphItemAttribute(edgeProps, key: "purpose", defaultValue: "unknown"), + Weight = edgeWeight, + Config = GetConfig(edgeProps) + }; + + // Add edge to graph + graph.AddEdge(sourceNode, targetNode, edgePayload); + } + + return graph; + } + + private string GetGraphItemAttribute(JsonElement? properties, string key, string defaultValue) + { + if (properties == null || properties.Value.ValueKind == JsonValueKind.Undefined) + { + return defaultValue; + } + + if (properties.Value.TryGetProperty(key, out var name) && name.ValueKind == JsonValueKind.String) + { + return name.GetString() ?? defaultValue; + } + + return defaultValue; + } + + private Dictionary GetConfig(JsonElement? properties) + { + var config = new Dictionary(); + + if (properties == null || properties.Value.ValueKind == JsonValueKind.Undefined) + { + return config; + } + + // Convert all properties to config dictionary + foreach (var prop in properties.Value.EnumerateObject()) + { + config[prop.Name] = prop.Value.ConvertToString(); + } + + return config; + } + + private RuleGraph GetDefaultGraph() + { + var graph = RuleGraph.Init(); + var root = new RuleNode + { + Name = "start", + Type = "start", + }; + + var end = new RuleNode + { + Name = "end", + Type = "end", + }; + + var node1 = new RuleNode + { + Name = "http_request", + Type = "action", + Config = new() + { + ["http_method"] = "GET", + ["http_url"] = "https://dummy.restapiexample.com/api/v1/employees" + } + }; + + var node2 = new RuleNode + { + Name = "http_request", + Type = "action", + Config = new() + { + ["http_method"] = "GET", + ["http_url"] = "https://dummy.restapiexample.com/api/v1/employee/1" + } + }; + + var node3 = new RuleNode + { + Name = "http_request", + Type = "action", + Config = new() + { + ["http_method"] = "GET", + ["http_url"] = "https://dummy.restapiexample.com/api/v1/employee/2" + } + }; + + graph.AddEdge(root, node1, payload: new() + { + Name = "edge", + Type = "next" + }); + + graph.AddEdge(node1, node2, payload: new() + { + Name = "edge", + Type = "next" + }); + + graph.AddEdge(node1, node3, payload: new() + { + Name = "edge", + Type = "next" + }); + + graph.AddEdge(node2, node3, payload: new() + { + Name = "edge", + Type = "next" + }); + + graph.AddEdge(node3, end, payload: new() + { + Name = "edge", + Type = "next" + }); + + return graph; + } +} diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs b/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs index 380e9b703..d5f79cb1a 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Graph.Models; +using BotSharp.Plugin.Membase.Interfaces; using BotSharp.Plugin.Membase.Models.Graph; namespace BotSharp.Plugin.Membase.Services; diff --git a/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs b/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs index 77bf58351..c1370746d 100644 --- a/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs +++ b/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs @@ -5,4 +5,23 @@ public class MembaseSettings public string Host { get; set; } = "localhost"; public string ProjectId { get; set; } = string.Empty; public string ApiKey { get; set; } = string.Empty; + public GraphInstance[] GraphInstances { get; set; } = []; } + +public class GraphInstance +{ + /// + /// Graph id + /// + public string Id { get; set; } + + /// + /// Graph name + /// + public string Name { get; set; } + + /// + /// Graph description + /// + public string Description { get; set; } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs index 4205fdc46..b481394a2 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs @@ -7,7 +7,7 @@ public class AgentRuleMongoElement { public string TriggerName { get; set; } = default!; public bool Disabled { get; set; } - public string Criteria { get; set; } = default!; + public RuleConfigMongoModel? Config { get; set; } public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { @@ -15,7 +15,7 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + Config = RuleConfigMongoModel.ToMongoModel(rule.Config) }; } @@ -25,7 +25,39 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + Config = RuleConfigMongoModel.ToDomainModel(rule.Config) + }; + } +} + +[BsonIgnoreExtraElements(Inherited = true)] +public class RuleConfigMongoModel +{ + public string? TopologyName { get; set; } + + public static RuleConfigMongoModel? ToMongoModel(RuleConfig? config) + { + if (config == null) + { + return null; + } + + return new RuleConfigMongoModel + { + TopologyName = config.TopologyName + }; + } + + public static RuleConfig? ToDomainModel(RuleConfigMongoModel? config) + { + if (config == null) + { + return null; + } + + return new RuleConfig + { + TopologyName = config.TopologyName }; } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs index 8a176a3fd..a003b3833 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs @@ -648,8 +648,7 @@ public async Task> TruncateConversation(string conversationId, stri continue; } - var values = state.Values.Where(x => x.MessageId != messageId) - .Where(x => x.UpdateTime < refTime) + var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime) .ToList(); if (values.Count == 0) continue; diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj new file mode 100644 index 000000000..4a8f3ff20 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj @@ -0,0 +1,22 @@ + + + + $(TargetFramework) + enable + $(LangVersion) + $(BotSharpVersion) + $(GeneratePackageOnBuild) + $(GenerateDocumentationFile) + $(SolutionDir)packages + + + + + + + + + + + + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs new file mode 100644 index 000000000..81c7de270 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs @@ -0,0 +1,73 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQChannelPool +{ + private readonly ObjectPool _pool; + private readonly ILogger _logger; + private readonly int _tryLimit = 3; + + public RabbitMQChannelPool( + IServiceProvider services, + IRabbitMQConnection mqConnection) + { + _logger = services.GetRequiredService().CreateLogger(); + var poolProvider = new DefaultObjectPoolProvider(); + var policy = new ChannelPoolPolicy(mqConnection.Connection); + _pool = poolProvider.Create(policy); + } + + public IChannel Get() + { + var count = 0; + var channel = _pool.Get(); + + while (count < _tryLimit && channel.IsClosed) + { + channel.Dispose(); + channel = _pool.Get(); + count++; + } + + if (channel.IsClosed) + { + _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries."); + } + + return channel; + } + + public void Return(IChannel channel) + { + if (channel.IsOpen) + { + _pool.Return(channel); + } + else + { + channel.Dispose(); + } + } +} + +internal class ChannelPoolPolicy : IPooledObjectPolicy +{ + private readonly IConnection _connection; + + public ChannelPoolPolicy(IConnection connection) + { + _connection = connection; + } + + public IChannel Create() + { + return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + + public bool Return(IChannel obj) + { + return true; + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs new file mode 100644 index 000000000..989c0a7b7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs @@ -0,0 +1,13 @@ +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public static class RabbitMQChannelPoolFactory +{ + private static readonly ConcurrentDictionary _poolDict = new(); + + public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection) + { + return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection)); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs new file mode 100644 index 000000000..dac9e8c07 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs @@ -0,0 +1,154 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Threading; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQConnection : IRabbitMQConnection +{ + private readonly RabbitMQSettings _settings; + private readonly IConnectionFactory _connectionFactory; + private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1); + private readonly ILogger _logger; + private readonly int _retryCount = 5; + + private IConnection _connection; + private bool _disposed = false; + + public RabbitMQConnection( + RabbitMQSettings settings, + ILogger logger) + { + _settings = settings; + _logger = logger; + _connectionFactory = new ConnectionFactory + { + HostName = settings.HostName, + Port = settings.Port, + UserName = settings.UserName, + Password = settings.Password, + VirtualHost = settings.VirtualHost, + ConsumerDispatchConcurrency = 1, + AutomaticRecoveryEnabled = true, + HandshakeContinuationTimeout = TimeSpan.FromSeconds(20) + }; + } + + public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + + public IConnection Connection => _connection; + + public async Task CreateChannelAsync() + { + if (!IsConnected) + { + throw new InvalidOperationException("Rabbit MQ is not connectioned."); + } + return await _connection.CreateChannelAsync(); + } + + public async Task ConnectAsync() + { + await _lock.WaitAsync(); + + try + { + if (IsConnected) + { + return true; + } + + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + _connection = await _connectionFactory.CreateConnectionAsync(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync; + _connection.CallbackExceptionAsync += OnCallbackExceptionAsync; + _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync; + _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}"); + return true; + } + _logger.LogError("Rabbit MQ client connection error."); + return false; + } + finally + { + _lock.Release(); + } + + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s"); + }); + } + + private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. {e}."); + return Task.CompletedTask; + } + + private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}."); + return Task.CompletedTask; + } + + private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}."); + return Task.CompletedTask; + } + + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning("Start disposing Rabbit MQ connection."); + + try + { + _connection.Dispose(); + _disposed = true; + _logger.LogWarning("Disposed Rabbit MQ connection."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when disposing Rabbit MQ connection"); + } + + GC.SuppressFinalize(this); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs new file mode 100644 index 000000000..36af0df90 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class DummyMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "dummy.queue", + RoutingKey = "my.routing" + }; + + public DummyMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed dummy message data: {data}"); + return await Task.FromResult(true); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs new file mode 100644 index 000000000..f6040dcd7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs @@ -0,0 +1,25 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class ScheduledMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "scheduled.queue", + RoutingKey = "my.routing" + }; + + public ScheduledMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed scheduled message data: {data}"); + return await Task.FromResult(true); + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs new file mode 100644 index 000000000..802e4fa1b --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs @@ -0,0 +1,89 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Plugin.RabbitMQ.Controllers; + +[Authorize] +[ApiController] +public class RabbitMQController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly IMQService _mqService; + private readonly ILogger _logger; + + public RabbitMQController( + IServiceProvider services, + IMQService mqService, + ILogger logger) + { + _services = services; + _mqService = mqService; + _logger = logger; + } + + /// + /// Publish a scheduled message to be delivered after a delay + /// + /// The scheduled message request + [HttpPost("/message-queue/publish")] + public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request) + { + if (request == null) + { + return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." }); + } + + try + { + var payload = new ScheduledMessagePayload + { + Name = request.Name ?? "Hello" + }; + + var success = await _mqService.PublishAsync( + payload, + options: new() + { + TopicName = "my.exchange", + RoutingKey = "my.routing", + DelayMilliseconds = request.DelayMilliseconds ?? 10000, + MessageId = request.MessageId + }); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to publish scheduled message"); + return StatusCode(StatusCodes.Status500InternalServerError, + new PublishMessageResponse { Success = false, Error = ex.Message }); + } + } + + /// + /// Unsubscribe a consumer + /// + /// + /// + [HttpPost("/message-queue/unsubscribe/consumer")] + public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request body is required." }); + } + + try + { + var success = await _mqService.UnsubscribeAsync(request.Name); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}"); + return StatusCode(StatusCodes.Status500InternalServerError, + new { Success = false, Error = ex.Message }); + } + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs new file mode 100644 index 000000000..cb89c2976 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs @@ -0,0 +1,11 @@ +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Interfaces; + +public interface IRabbitMQConnection : IDisposable +{ + bool IsConnected { get; } + IConnection Connection { get; } + Task CreateChannelAsync(); + Task ConnectAsync(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs new file mode 100644 index 000000000..ad655b795 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs @@ -0,0 +1,31 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Request model for publishing a scheduled message +/// +public class PublishScheduledMessageRequest +{ + public string? Name { get; set; } + + public long? DelayMilliseconds { get; set; } + + public string? MessageId { get; set; } +} + + +/// +/// Response model for publish operations +/// +public class PublishMessageResponse +{ + /// + /// Whether the message was successfully published + /// + public bool Success { get; set; } + + /// + /// Error message if publish failed + /// + public string? Error { get; set; } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs new file mode 100644 index 000000000..93754d455 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +internal class RabbitMQConsumerConfig +{ + /// + /// The exchange name (topic in some MQ systems). + /// + internal string ExchangeName { get; set; } = "rabbitmq.exchange"; + + /// + /// The queue name (subscription in some MQ systems). + /// + internal string QueueName { get; set; } = "rabbitmq.queue"; + + /// + /// The routing key (filter in some MQ systems). + /// + internal string RoutingKey { get; set; } = "rabbitmq.routing"; + + /// + /// Additional arguments for the consumer configuration. + /// + internal Dictionary Arguments { get; set; } = new(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs new file mode 100644 index 000000000..2180fb2d7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs @@ -0,0 +1,9 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Payload for scheduled/delayed messages +/// +public class ScheduledMessagePayload +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs new file mode 100644 index 000000000..509d432b2 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +public class UnsubscribeConsumerRequest +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs new file mode 100644 index 000000000..ff45dfe48 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs @@ -0,0 +1,56 @@ +using BotSharp.Plugin.RabbitMQ.Services; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Plugin.RabbitMQ; + +public class RabbitMQPlugin : IBotSharpAppPlugin +{ + public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514"; + public string Name => "RabbitMQ"; + public string Description => "Handle AI messages in RabbitMQ."; + public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg"; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var settings = new RabbitMQSettings(); + config.Bind("RabbitMQ", settings); + services.AddSingleton(settings); + + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + services.AddSingleton(); + services.AddSingleton(); + } + } + + public void Configure(IApplicationBuilder app) + { +#if DEBUG + var sp = app.ApplicationServices; + var mqSettings = sp.GetRequiredService(); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + var mqService = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + // Create and subscribe the consumer using the abstract interface + var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + + var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + } +#endif + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs new file mode 100644 index 000000000..0117bad14 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs @@ -0,0 +1,318 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Services; + +public class RabbitMQService : IMQService +{ + private readonly IRabbitMQConnection _mqConnection; + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + private readonly int _retryCount = 5; + private bool _disposed = false; + private static readonly ConcurrentDictionary _consumers = []; + + public RabbitMQService( + IRabbitMQConnection mqConnection, + IServiceProvider services, + ILogger logger) + { + _mqConnection = mqConnection; + _services = services; + _logger = logger; + } + + public async Task SubscribeAsync(string key, IMQConsumer consumer) + { + if (_consumers.ContainsKey(key)) + { + _logger.LogWarning($"Consumer with key '{key}' is already subscribed."); + return false; + } + + var registration = await CreateConsumerRegistrationAsync(consumer); + if (registration != null && _consumers.TryAdd(key, registration)) + { + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'."); + return true; + } + + return false; + } + + public async Task UnsubscribeAsync(string key) + { + if (!_consumers.TryRemove(key, out var registration)) + { + return false; + } + + try + { + if (registration.Channel != null) + { + registration.Channel.Dispose(); + } + registration.Consumer.Dispose(); + _logger.LogInformation($"Consumer '{key}' unsubscribed."); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error unsubscribing consumer '{key}'."); + return false; + } + } + + private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer) + { + try + { + var channel = await CreateChannelAsync(consumer); + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var registration = new ConsumerRegistration(consumer, channel); + + var asyncConsumer = new AsyncEventingBasicConsumer(channel); + asyncConsumer.ReceivedAsync += async (sender, eventArgs) => + { + await ConsumeEventAsync(registration, eventArgs); + }; + + await channel.BasicConsumeAsync( + queue: config.QueueName, + autoAck: false, + consumer: asyncConsumer); + + _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'."); + return registration; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when register consumer in RabbitMQ."); + return null; + } + } + + private async Task CreateChannelAsync(IMQConsumer consumer) + { + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var channel = await _mqConnection.CreateChannelAsync(); + _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'"); + + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (config.Arguments != null) + { + foreach (var kvp in config.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: config.ExchangeName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + await channel.QueueDeclareAsync( + queue: config.QueueName, + durable: true, + exclusive: false, + autoDelete: false); + + await channel.QueueBindAsync( + queue: config.QueueName, + exchange: config.ExchangeName, + routingKey: config.RoutingKey); + + return channel; + } + + private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs) + { + var data = string.Empty; + var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new(); + + try + { + data = Encoding.UTF8.GetString(eventArgs.Body.Span); + _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}"); + + var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data); + if (registration.Channel?.IsOpen == true) + { + if (isHandled) + { + await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false); + } + else + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}"); + if (registration.Channel?.IsOpen == true) + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + + public async Task PublishAsync(T payload, MQPublishOptions options) + { + try + { + if (options == null) + { + return false; + } + + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var isPublished = false; + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection); + var channel = channelPool.Get(); + + try + { + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (!options.Arguments.IsNullOrEmpty()) + { + foreach (var kvp in options.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: options.TopicName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + var messageId = options.MessageId ?? Guid.NewGuid().ToString(); + var message = new MQMessage(payload, messageId); + var body = ConvertToBinary(message, options.JsonOptions); + var properties = new BasicProperties + { + MessageId = messageId, + DeliveryMode = DeliveryModes.Persistent, + Headers = new Dictionary + { + ["x-delay"] = options.DelayMilliseconds + } + }; + + await channel.BasicPublishAsync( + exchange: options.TopicName, + routingKey: options.RoutingKey, + mandatory: true, + basicProperties: properties, + body: body); + + isPublished = true; + } + catch (Exception) + { + throw; + } + finally + { + channelPool.Return(channel); + } + }); + + return isPublished; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when RabbitMQ publish message."); + return false; + } + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s"); + }); + } + + private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null) + { + var jsonStr = JsonSerializer.Serialize(data, jsonOptions); + var body = Encoding.UTF8.GetBytes(jsonStr); + return body; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning($"Disposing {nameof(RabbitMQService)}"); + + foreach (var item in _consumers) + { + if (item.Value.Channel != null) + { + item.Value.Channel.Dispose(); + } + item.Value.Consumer.Dispose(); + } + + _disposed = true; + GC.SuppressFinalize(this); + } + + /// + /// Internal class to track consumer registrations with their RabbitMQ channels. + /// + private class ConsumerRegistration + { + public IMQConsumer Consumer { get; } + public IChannel? Channel { get; } + + public ConsumerRegistration(IMQConsumer consumer, IChannel? channel) + { + Consumer = consumer; + Channel = channel; + } + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs new file mode 100644 index 000000000..0e61b5c71 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs @@ -0,0 +1,10 @@ +namespace BotSharp.Plugin.RabbitMQ.Settings; + +public class RabbitMQSettings +{ + public string HostName { get; set; } = "localhost"; + public int Port { get; set; } = 5672; + public string UserName { get; set; } = "guest"; + public string Password { get; set; } = "guest"; + public string VirtualHost { get; set; } = "/"; +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs new file mode 100644 index 000000000..0a8a8c3a5 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs @@ -0,0 +1,38 @@ +global using System; +global using System.Collections.Generic; +global using System.Text; +global using System.Linq; +global using System.Text.Json; +global using System.Net.Mime; +global using System.Threading.Tasks; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using BotSharp.Abstraction.Agents; +global using BotSharp.Abstraction.Conversations; +global using BotSharp.Abstraction.Plugins; +global using BotSharp.Abstraction.Conversations.Models; +global using BotSharp.Abstraction.Functions; +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Agents.Enums; +global using BotSharp.Abstraction.Files.Enums; +global using BotSharp.Abstraction.Files.Models; +global using BotSharp.Abstraction.Files.Converters; +global using BotSharp.Abstraction.Files; +global using BotSharp.Abstraction.MLTasks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Agents.Settings; +global using BotSharp.Abstraction.Functions.Models; +global using BotSharp.Abstraction.Repositories; +global using BotSharp.Abstraction.Settings; +global using BotSharp.Abstraction.Messaging; +global using BotSharp.Abstraction.Messaging.Models.RichContent; +global using BotSharp.Abstraction.Options; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +global using BotSharp.Plugin.RabbitMQ.Settings; +global using BotSharp.Plugin.RabbitMQ.Models; +global using BotSharp.Plugin.RabbitMQ.Interfaces; +global using BotSharp.Plugin.RabbitMQ.Consumers; +global using BotSharp.Plugin.RabbitMQ.Connections; \ No newline at end of file diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json index e4ad7bf57..fb7367dd2 100644 --- a/src/WebStarter/appsettings.json +++ b/src/WebStarter/appsettings.json @@ -1006,6 +1006,7 @@ "Language": "en" } }, + "A2AIntegration": { "Enabled": true, "DefaultTimeoutSeconds": 30, @@ -1018,12 +1019,27 @@ } ] }, + + "MessageQueue": { + "Enabled": false, + "Provider": "RabbitMQ" + }, + + "RabbitMQ": { + "HostName": "localhost", + "Port": 5672, + "UserName": "guest", + "Password": "guest", + "VirtualHost": "/" + }, + "PluginLoader": { "Assemblies": [ "BotSharp.Core", "BotSharp.Core.A2A", "BotSharp.Core.SideCar", "BotSharp.Core.Crontab", + "BotSharp.Core.Rules", "BotSharp.Core.Realtime", "BotSharp.Logger", "BotSharp.Plugin.MongoStorage",