diff --git a/Directory.Packages.props b/Directory.Packages.props
index f307df8e4..5865fc577 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -10,6 +10,7 @@
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
index 75c0985a8..3ae15d104 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
@@ -8,6 +8,13 @@ public class AgentRule
[JsonPropertyName("disabled")]
public bool Disabled { get; set; }
- [JsonPropertyName("criteria")]
- public string Criteria { get; set; } = string.Empty;
+ [JsonPropertyName("config")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public RuleConfig? Config { get; set; }
}
+
+public class RuleConfig
+{
+ [JsonPropertyName("topology_name")]
+ public string? TopologyName { get; set; }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
index e2ec38a5a..8442eae45 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
@@ -4,4 +4,5 @@ public class CodeExecutionContext
{
public AgentCodeScript CodeScript { get; set; }
public List Arguments { get; set; } = [];
+ public string? InvokeFrom { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
new file mode 100644
index 000000000..4df43dd0e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
@@ -0,0 +1,21 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract interface for message queue consumers.
+/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus).
+///
+public interface IMQConsumer : IDisposable
+{
+ ///
+ /// Gets the consumer config
+ ///
+ object Config { get; }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ Task HandleMessageAsync(string channel, string data);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
new file mode 100644
index 000000000..672e539c1
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
@@ -0,0 +1,31 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public interface IMQService : IDisposable
+{
+ ///
+ /// Subscribe a consumer to the message queue.
+ /// The consumer will be initialized with the appropriate MQ-specific infrastructure.
+ ///
+ /// Unique identifier for the consumer
+ /// The consumer implementing IMQConsumer interface
+ /// Task representing the async subscription operation
+ Task SubscribeAsync(string key, IMQConsumer consumer);
+
+ ///
+ /// Unsubscribe a consumer from the message queue.
+ ///
+ /// Unique identifier for the consumer
+ /// Task representing the async unsubscription operation
+ Task UnsubscribeAsync(string key);
+
+ ///
+ /// Publish payload to message queue
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task PublishAsync(T payload, MQPublishOptions options);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
new file mode 100644
index 000000000..cd66be1fd
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
@@ -0,0 +1,51 @@
+using Microsoft.Extensions.Logging;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract base class for RabbitMQ consumers.
+/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ.
+/// The RabbitMQ-specific infrastructure is handled by RabbitMQService.
+///
+public abstract class MQConsumerBase : IMQConsumer
+{
+ protected readonly IServiceProvider _services;
+ protected readonly ILogger _logger;
+ private bool _disposed = false;
+
+ ///
+ /// Gets the consumer config for this consumer.
+ /// Override this property to customize exchange, queue and routing configuration.
+ ///
+ public abstract object Config { get; }
+
+ protected MQConsumerBase(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ public abstract Task HandleMessageAsync(string channel, string data);
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ var consumerName = GetType().Name;
+ _logger.LogWarning($"Disposing consumer: {consumerName}");
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
new file mode 100644
index 000000000..b08a5a054
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
@@ -0,0 +1,7 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public class MessageQueueSettings
+{
+ public bool Enabled { get; set; }
+ public string Provider { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
new file mode 100644
index 000000000..e940aff01
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
@@ -0,0 +1,14 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+public class MQMessage
+{
+ public MQMessage(T payload, string messageId)
+ {
+ Payload = payload;
+ MessageId = messageId;
+ }
+
+ public T Payload { get; set; }
+ public string MessageId { get; set; }
+ public DateTime CreateDate { get; set; } = DateTime.UtcNow;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
new file mode 100644
index 000000000..dead523be
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
@@ -0,0 +1,40 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+///
+/// Configuration options for publishing messages to a message queue.
+/// These options are MQ-product agnostic and can be adapted by different implementations.
+///
+public class MQPublishOptions
+{
+ ///
+ /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus).
+ ///
+ public string TopicName { get; set; } = string.Empty;
+
+ ///
+ /// The routing key (partition key in some MQ systems, used for message routing).
+ ///
+ public string RoutingKey { get; set; } = string.Empty;
+
+ ///
+ /// Delay in milliseconds before the message is delivered.
+ ///
+ public long DelayMilliseconds { get; set; }
+
+ ///
+ /// Optional unique identifier for the message.
+ ///
+ public string? MessageId { get; set; }
+
+ ///
+ /// Additional arguments for the publish configuration (MQ-specific).
+ ///
+ public Dictionary Arguments { get; set; } = [];
+
+ ///
+ /// Json serializer options
+ ///
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
index 54fc80a90..a39e4bf05 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
@@ -12,5 +12,5 @@ public interface IInstructHook : IHookBase
Task OnResponseGenerated(InstructResponseModel response) => Task.CompletedTask;
Task BeforeCodeExecution(Agent agent, CodeExecutionContext context) => Task.CompletedTask;
- Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) => Task.CompletedTask;
+ Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) => Task.CompletedTask;
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
index f5758434c..81023135f 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
@@ -28,7 +28,7 @@ public virtual async Task BeforeCodeExecution(Agent agent, CodeExecutionContext
await Task.CompletedTask;
}
- public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response)
+ public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response)
{
await Task.CompletedTask;
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
new file mode 100644
index 000000000..49f2029a7
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
@@ -0,0 +1,13 @@
+using BotSharp.Abstraction.Hooks;
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules.Hooks;
+
+public interface IRuleTriggerHook : IHookBase
+{
+ Task BeforeRuleConditionExecuting(Agent agent, RuleNode conditionNode, IRuleTrigger trigger, RuleFlowContext context) => Task.CompletedTask;
+ Task AfterRuleConditionExecuted(Agent agent, RuleNode conditionNode, IRuleTrigger trigger, RuleFlowContext context, RuleNodeResult result) => Task.CompletedTask;
+
+ Task BeforeRuleActionExecuting(Agent agent, RuleNode actionNode, IRuleTrigger trigger, RuleFlowContext context) => Task.CompletedTask;
+ Task AfterRuleActionExecuted(Agent agent, RuleNode actionNode, IRuleTrigger trigger, RuleFlowContext context, RuleNodeResult result) => Task.CompletedTask;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
index bebac5d6f..041675a2c 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
@@ -1,5 +1,21 @@
+using BotSharp.Abstraction.Rules.Models;
+
namespace BotSharp.Abstraction.Rules;
-public interface IRuleAction
+///
+/// Base interface for rule actions that can be executed by the RuleEngine
+///
+public interface IRuleAction : IRuleFlowUnit
{
-}
+ ///
+ /// Execute the rule action
+ ///
+ /// The agent that triggered the rule
+ /// The rule trigger
+ /// The flow context
+ /// The action execution result
+ Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context);
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs
new file mode 100644
index 000000000..745ad3349
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCondition.cs
@@ -0,0 +1,22 @@
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules;
+
+///
+/// Base interface for rule conditions that can be evaluated by the RuleEngine
+///
+public interface IRuleCondition : IRuleFlowUnit
+{
+ ///
+ /// Evaluate the rule condition
+ ///
+ /// The agent that triggered the rule
+ /// The rule trigger
+ /// The flow context
+ /// The condition evaluation result
+ Task EvaluateAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context);
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs
deleted file mode 100644
index dcbe18271..000000000
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleConfig.cs
+++ /dev/null
@@ -1,5 +0,0 @@
-namespace BotSharp.Abstraction.Rules;
-
-public interface IRuleConfig
-{
-}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
deleted file mode 100644
index bc5022911..000000000
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
+++ /dev/null
@@ -1,5 +0,0 @@
-namespace BotSharp.Abstraction.Rules;
-
-public interface IRuleCriteria
-{
-}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
index c7a6d847b..d9958e549 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
@@ -1,3 +1,5 @@
+using BotSharp.Abstraction.Rules.Models;
+
namespace BotSharp.Abstraction.Rules;
public interface IRuleEngine
@@ -13,4 +15,15 @@ public interface IRuleEngine
///
Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null)
=> throw new NotImplementedException();
+
+ ///
+ /// Execute rule graph node
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task ExecuteGraphNode(RuleNode node, RuleGraph graph, string agentId, IRuleTrigger trigger, RuleNodeExecutionOptions options);
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs
new file mode 100644
index 000000000..71dca2ac9
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlow.cs
@@ -0,0 +1,26 @@
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules;
+
+public interface IRuleFlow where T : class
+{
+ ///
+ /// Rule flow topology name
+ ///
+ string Name { get; }
+
+ ///
+ /// Get rule flow topology config
+ ///
+ ///
+ ///
+ Task GetTopologyConfigAsync(RuleFlowConfigOptions? options = null);
+
+ ///
+ /// Get rule flow topology
+ ///
+ ///
+ ///
+ ///
+ Task GetTopologyAsync(string id, RuleFlowLoadOptions? options = null);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs
new file mode 100644
index 000000000..795b2ce01
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleFlowUnit.cs
@@ -0,0 +1,21 @@
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules;
+
+public interface IRuleFlowUnit
+{
+ ///
+ /// The unique name of the rule flow unit, i.e., action, condition.
+ ///
+ string Name => string.Empty;
+
+ ///
+ /// The agent id
+ ///
+ string? AgentId => null;
+
+ ///
+ /// The trigger names
+ ///
+ IEnumerable? Triggers => null;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs
new file mode 100644
index 000000000..69b5915c0
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleConfigModel.cs
@@ -0,0 +1,10 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleConfigModel
+{
+ public string TopologyId { get; set; }
+ public string TopologyName { get; set; }
+ public JsonDocument CustomParameters { get; set; } = JsonDocument.Parse("{}");
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs
new file mode 100644
index 000000000..4e9040efe
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowContext.cs
@@ -0,0 +1,18 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+///
+/// Context for rule flow execution (actions and conditions)
+///
+public class RuleFlowContext
+{
+ public string Text { get; set; } = string.Empty;
+ public Dictionary Parameters { get; set; } = [];
+ public IEnumerable PrevStepResults { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+ public RuleNode Node { get; set; }
+ public RuleEdge Edge { get; set; }
+ public RuleGraph Graph { get; set; }
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs
new file mode 100644
index 000000000..b4b6d5953
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleFlowStepResult.cs
@@ -0,0 +1,22 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleFlowStepResult : RuleNodeResult
+{
+ public RuleNode Node { get; set; }
+
+ ///
+ /// Create a RuleFlowStepResult from a RuleNodeResult and a RuleNode
+ ///
+ public static RuleFlowStepResult FromResult(RuleNodeResult result, RuleNode node)
+ {
+ return new RuleFlowStepResult
+ {
+ Node = node,
+ Success = result.Success,
+ Response = result.Response,
+ ErrorMessage = result.ErrorMessage,
+ Data = new(result.Data ?? []),
+ IsDelayed = result.IsDelayed
+ };
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs
new file mode 100644
index 000000000..113126f0d
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleNodeResult.cs
@@ -0,0 +1,29 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleNodeResult
+{
+ ///
+ /// Whether the node is executed successfully
+ ///
+ public virtual bool Success { get; set; }
+
+ ///
+ /// Response content from the node
+ ///
+ public virtual string? Response { get; set; }
+
+ ///
+ /// Error message if the node execution failed
+ ///
+ public virtual string? ErrorMessage { get; set; }
+
+ ///
+ /// Result data (used for actions)
+ ///
+ public virtual Dictionary Data { get; set; } = [];
+
+ ///
+ /// Whether the node execution is delayed (used for actions)
+ ///
+ public virtual bool IsDelayed { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs
new file mode 100644
index 000000000..ddb3e734f
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowConfigOptions.cs
@@ -0,0 +1,6 @@
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleFlowConfigOptions
+{
+ public string? TopologyName { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs
new file mode 100644
index 000000000..38c653bcf
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowLoadOptions.cs
@@ -0,0 +1,7 @@
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleFlowLoadOptions
+{
+ public string? Query { get; set; }
+ public Dictionary? Parameters { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs
new file mode 100644
index 000000000..6d8709107
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleFlowOptions.cs
@@ -0,0 +1,34 @@
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleFlowOptions
+{
+ ///
+ /// Flow topology provider
+ ///
+ [JsonPropertyName("topology_provider")]
+ public string? TopologyProvider { get; set; }
+
+ ///
+ /// Flow topology name
+ ///
+ [JsonPropertyName("topology_name")]
+ public string? TopologyName { get; set; }
+
+ ///
+ /// Query to get flow topology
+ ///
+ [JsonPropertyName("query")]
+ public string? Query { get; set; }
+
+ ///
+ /// Graph traversal algorithm: "dfs" (default) or "bfs"
+ ///
+ [JsonPropertyName("traversal_algorithm")]
+ public string TraversalAlgorithm { get; set; } = "dfs";
+
+ ///
+ /// Additional custom parameters, e.g., root_node_name, max_recursion
+ ///
+ [JsonPropertyName("parameters")]
+ public Dictionary Parameters { get; set; } = [];
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs
new file mode 100644
index 000000000..2bc5b5c52
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleNodeExecutionOptions.cs
@@ -0,0 +1,11 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleNodeExecutionOptions
+{
+ public string Text { get; set; }
+ public IEnumerable States { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+ public RuleFlowOptions? Flow { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
index 068052b0b..46e35fe86 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
@@ -1,3 +1,4 @@
+using BotSharp.Abstraction.Repositories.Filters;
using System.Text.Json;
namespace BotSharp.Abstraction.Rules.Options;
@@ -5,22 +6,17 @@ namespace BotSharp.Abstraction.Rules.Options;
public class RuleTriggerOptions
{
///
- /// Code processor provider
+ /// Filter agents
///
- public string? CodeProcessor { get; set; }
+ public AgentFilter? AgentFilter { get; set; }
///
- /// Code script name
+ /// Json serializer options
///
- public string? CodeScriptName { get; set; }
+ public JsonSerializerOptions? JsonOptions { get; set; }
///
- /// Argument name as an input key to the code script
+ /// Rule flow options
///
- public string? ArgumentName { get; set; }
-
- ///
- /// Json arguments as an input value to the code script
- ///
- public JsonDocument? ArgumentContent { get; set; }
+ public RuleFlowOptions? Flow { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs
new file mode 100644
index 000000000..632a3df2d
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/RuleGraph.cs
@@ -0,0 +1,229 @@
+namespace BotSharp.Abstraction.Rules;
+
+public class RuleGraph
+{
+ private string _id = Guid.NewGuid().ToString();
+ private List _nodes = [];
+ private List _edges = [];
+
+ public RuleGraph()
+ {
+ _id = Guid.NewGuid().ToString();
+ _nodes = [];
+ _edges = [];
+ }
+
+ public static RuleGraph Init()
+ {
+ return new RuleGraph();
+ }
+
+ public RuleNode? GetRootNode(string? name = null)
+ {
+ if (!string.IsNullOrEmpty(name))
+ {
+ return _nodes.FirstOrDefault(x => x.Name.IsEqualTo(name));
+ }
+
+ return _nodes.FirstOrDefault(x => x.Type.IsEqualTo("root") || x.Type.IsEqualTo("start"));
+ }
+
+ public (RuleNode? Node, IEnumerable IncomingEdges, IEnumerable OutgoingEdges) GetNode(string id)
+ {
+ var node = _nodes.FirstOrDefault(x => x.Id.IsEqualTo(id));
+ if (node == null)
+ {
+ return (null, [], []);
+ }
+
+ var incomingEdges = _edges
+ .Where(e => e.To != null && e.To.Id.IsEqualTo(id))
+ .OrderByDescending(x => x.Weight)
+ .ToList();
+ var outgoingEdges = _edges
+ .Where(e => e.From != null && e.From.Id.IsEqualTo(id))
+ .OrderByDescending(x => x.Weight)
+ .ToList();
+
+ return (node, incomingEdges, outgoingEdges);
+ }
+
+ public string GetGraphId()
+ {
+ return _id;
+ }
+
+ public IEnumerable GetNodes(Func? filter = null)
+ {
+ return filter == null ? [.. _nodes] : [.. _nodes.Where(filter)];
+ }
+
+ public IEnumerable GetEdges(Func? filter = null)
+ {
+ return filter == null ? [.. _edges] : [.. _edges.Where(filter)];
+ }
+
+ public void SetGraphId(string id)
+ {
+ _id = id;
+ }
+
+ public void SetNodes(IEnumerable nodes)
+ {
+ _nodes = [.. nodes?.ToList() ?? []];
+ }
+
+ public void SetEdges(IEnumerable edges)
+ {
+ _edges = [.. edges?.ToList() ?? []];
+ }
+
+ public void AddNode(RuleNode node)
+ {
+ var found = _nodes.Exists(x => x.Id.IsEqualTo(node.Id));
+ if (!found)
+ {
+ _nodes.Add(node);
+ }
+ }
+
+ public void AddEdge(RuleNode from, RuleNode to, GraphItemPayload payload)
+ {
+ var sourceFound = _nodes.Exists(x => x.Id.IsEqualTo(from.Id));
+ var targetFound = _nodes.Exists(x => x.Id.IsEqualTo(to.Id));
+ var edgeFound = _edges.Exists(x => x.Id.IsEqualTo(payload.Id));
+
+ if (!sourceFound)
+ {
+ _nodes.Add(from);
+ }
+
+ if (!targetFound)
+ {
+ _nodes.Add(to);
+ }
+
+ if (!edgeFound)
+ {
+ _edges.Add(new RuleEdge(from, to)
+ {
+ Id = payload.Id,
+ Name = payload.Name,
+ Type = payload.Type,
+ Labels = [.. payload.Labels ?? []],
+ Weight = payload.Weight,
+ Purpose = payload.Purpose,
+ Config = new(payload.Config ?? [])
+ });
+ }
+ }
+
+ public IEnumerable<(RuleNode, RuleEdge)> GetParentNodes(RuleNode node)
+ {
+ return _edges.Where(e => e.To != null && e.To.Id.IsEqualTo(node.Id))
+ .OrderByDescending(e => e.Weight)
+ .Select(e => (e.From, e))
+ .ToList();
+ }
+
+ public IEnumerable<(RuleNode, RuleEdge)> GetChildrenNodes(RuleNode node)
+ {
+ return _edges.Where(e => e.From != null && e.From.Id.IsEqualTo(node.Id))
+ .OrderByDescending(e => e.Weight)
+ .Select(e => (e.To, e))
+ .ToList();
+ }
+
+ public RuleGraphInfo GetGraphInfo()
+ {
+ return new()
+ {
+ GraphId = _id,
+ Nodes = [.. _nodes?.ToList() ?? []],
+ Edges = [.. _edges?.ToList() ?? []]
+ };
+ }
+
+ public void Clear()
+ {
+ _nodes = [];
+ _edges = [];
+ }
+
+ public static RuleGraph FromGraphInfo(RuleGraphInfo graphInfo)
+ {
+ var graph = new RuleGraph();
+ graph.SetGraphId(graphInfo.GraphId.IfNullOrEmptyAs(Guid.NewGuid().ToString())!);
+ graph.SetNodes(graphInfo.Nodes);
+ graph.SetEdges(graphInfo.Edges);
+ return graph;
+ }
+
+ public override string ToString()
+ {
+ return $"Graph ({_id}) => Nodes: {_nodes.Count}, Edges: {_edges.Count}";
+ }
+}
+
+public class RuleNode : GraphItem
+{
+ ///
+ /// Node type: root, criteria, action, etc.
+ ///
+ public override string Type { get; set; } = "action";
+
+ public override string ToString()
+ {
+ return $"Node ({Id}): {Name} ({Type} => {Purpose})";
+ }
+}
+
+public class RuleEdge : GraphItem
+{
+ ///
+ /// Edge type: is_next, etc.
+ ///
+ public override string Type { get; set; } = "next";
+
+ public RuleNode From { get; set; }
+ public RuleNode To { get; set; }
+
+ public RuleEdge() : base()
+ {
+
+ }
+
+ public RuleEdge(RuleNode from, RuleNode to) : base()
+ {
+ Id = Guid.NewGuid().ToString();
+ From = from;
+ To = to;
+ }
+
+ public override string ToString()
+ {
+ return $"Edge ({Id}): {Name} ({Type} => {Purpose}), Connects from Node ({From?.Name}) to Node ({To?.Name})";
+ }
+}
+
+public class GraphItem
+{
+ public virtual string Id { get; set; } = Guid.NewGuid().ToString();
+ public virtual string Name { get; set; } = null!;
+ public virtual string Type { get; set; } = null!;
+ public virtual IEnumerable Labels { get; set; } = [];
+ public virtual double Weight { get; set; } = 1.0;
+ public virtual string? Purpose { get; set; }
+ public virtual Dictionary Config { get; set; } = [];
+}
+
+public class GraphItemPayload : GraphItem
+{
+}
+
+public class RuleGraphInfo
+{
+ public string GraphId { get; set; }
+ public IEnumerable Nodes { get; set; } = [];
+ public IEnumerable Edges { get; set; } = [];
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
index a36516c8a..c0f858fd5 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
@@ -65,4 +65,70 @@ public static class ObjectExtensions
return null;
}
}
+
+ public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null)
+ {
+ return dict.TryGetValue(key, out var value, jsonOptions)
+ ? value!
+ : defaultValue;
+ }
+
+ public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null)
+ {
+ result = default;
+
+ if (!dict.TryGetValue(key, out var value) || value is null)
+ {
+ return false;
+ }
+
+ if (value is T t)
+ {
+ result = t;
+ return true;
+ }
+
+ if (value is JsonElement je)
+ {
+ try
+ {
+ result = je.Deserialize(jsonOptions);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+
+ public static T? TryGetObjectValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) where T : class
+ {
+ return dict.TryGetObjectValue(key, out var value, jsonOptions)
+ ? value!
+ : defaultValue;
+ }
+
+ public static bool TryGetObjectValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) where T : class
+ {
+ result = default;
+
+ if (!dict.TryGetValue(key, out var value) || value is null)
+ {
+ return false;
+ }
+
+ try
+ {
+ result = JsonSerializer.Deserialize(value, jsonOptions);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs
new file mode 100644
index 000000000..e525ad9a0
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs
@@ -0,0 +1,82 @@
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class ChatRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public ChatRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "send_message_to_agent";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ using var scope = _services.CreateScope();
+ var sp = scope.ServiceProvider;
+
+ try
+ {
+ var channel = trigger.Channel;
+ var convService = sp.GetRequiredService();
+ var conv = await convService.NewConversation(new Conversation
+ {
+ Channel = channel,
+ Title = context.Text,
+ AgentId = agent.Id
+ });
+
+ var message = new RoleDialogModel(AgentRole.User, context.Text);
+
+ var allStates = new List
+ {
+ new("channel", channel)
+ };
+
+ if (!context.Parameters.IsNullOrEmpty())
+ {
+ var states = context.Parameters.Where(x => x.Value != null).Select(x => new MessageState(x.Key, x.Value!));
+ allStates.AddRange(states);
+ }
+
+ await convService.SetConversationId(conv.Id, allStates);
+ await convService.SendMessage(agent.Id,
+ message,
+ null,
+ msg => Task.CompletedTask);
+
+ var data = new Dictionary(convService.States.GetStates() ?? []);
+ await convService.SaveStates();
+
+ _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id);
+
+
+ data["agent_id"] = agent.Id;
+ data["conversation_id"] = conv.Id;
+
+ return new RuleNodeResult
+ {
+ Success = true,
+ Response = conv.Id,
+ Data = data
+ };
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs
new file mode 100644
index 000000000..e9b040e76
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs
@@ -0,0 +1,54 @@
+using BotSharp.Abstraction.Functions;
+
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class FunctionCallRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public FunctionCallRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "function_call";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ var funcName = context.Parameters.TryGetValue("function_name", out var fName) ? fName : null;
+ var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName));
+
+ if (func == null)
+ {
+ var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = errorMsg
+ };
+ }
+
+ var funcArg = context.Parameters.TryGetObjectValueOrDefault("function_argument", new()) ?? new();
+ await func.Execute(funcArg);
+
+ return new RuleNodeResult
+ {
+ Success = true,
+ Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content,
+ Data = new()
+ {
+ ["function_name"] = funcName!,
+ ["function_argument"] = funcArg?.ConvertToString() ?? "{}",
+ ["function_call_result"] = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content ?? string.Empty
+ }
+ };
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs
new file mode 100644
index 000000000..402f098a9
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs
@@ -0,0 +1,204 @@
+using System.Net.Mime;
+using System.Text.Json;
+using System.Web;
+
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class HttpRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IHttpClientFactory _httpClientFactory;
+
+ public HttpRuleAction(
+ IServiceProvider services,
+ ILogger logger,
+ IHttpClientFactory httpClientFactory)
+ {
+ _services = services;
+ _logger = logger;
+ _httpClientFactory = httpClientFactory;
+ }
+
+ public string Name => "http_request";
+
+ // Default configuration example:
+ // {
+ // "http_url": "https://dummy.example.com/api/v1/employees",
+ // "http_method": "GET"
+ // }
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ try
+ {
+ var httpMethod = GetHttpMethod(context);
+ if (httpMethod == null)
+ {
+ var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = errorMsg
+ };
+ }
+
+ // Build the full URL
+ var fullUrl = BuildUrl(context);
+
+ using var client = _httpClientFactory.CreateClient();
+
+ // Add headers
+ AddHttpHeaders(client, context);
+
+ // Create request
+ var request = new HttpRequestMessage(httpMethod, fullUrl);
+
+ // Add request body if provided
+ var requestBodyStr = GetHttpRequestBody(context);
+ if (!string.IsNullOrEmpty(requestBodyStr))
+ {
+ request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json);
+ }
+
+ _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}",
+ agent.Id, fullUrl, httpMethod);
+
+ // Send request
+ var response = await client.SendAsync(request);
+ var responseContent = await response.Content.ReadAsStringAsync();
+
+ if (response.IsSuccessStatusCode)
+ {
+ _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}, Response: {Response}",
+ agent.Id, response.StatusCode, responseContent);
+
+ return new RuleNodeResult
+ {
+ Success = true,
+ Response = responseContent,
+ Data = new()
+ {
+ ["http_response_headers"] = JsonSerializer.Serialize(response.Headers),
+ ["http_response"] = responseContent
+ }
+ };
+ }
+ else
+ {
+ var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}";
+ _logger.LogWarning(errorMsg);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = errorMsg
+ };
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}",
+ agent.Id, trigger.Name);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+
+ private string BuildUrl(RuleFlowContext context)
+ {
+ var url = context.Parameters.GetValueOrDefault("http_url", string.Empty);
+ if (string.IsNullOrEmpty(url))
+ {
+ throw new ArgumentNullException("Unable to find http_url in context");
+ }
+
+ // Fill in placeholders in url
+ foreach (var param in context.Parameters)
+ {
+ var value = param.Value?.ToString();
+ if (string.IsNullOrEmpty(value))
+ {
+ continue;
+ }
+ url = url.Replace($"{{{param.Key}}}", value);
+ }
+
+ // Add query parameters
+ var queryParams = context.Parameters.TryGetObjectValueOrDefault>("http_query_params");
+ if (!queryParams.IsNullOrEmpty())
+ {
+ var builder = new UriBuilder(url);
+ var query = HttpUtility.ParseQueryString(builder.Query);
+
+ // Add new query params
+ foreach (var kv in queryParams!.Where(x => x.Value != null))
+ {
+ query[kv.Key] = kv.Value!;
+ }
+
+ // Assign merged query back
+ builder.Query = query.ToString();
+ url = builder.ToString();
+ }
+
+ _logger.LogInformation("HTTP url after filling: {Url}", url);
+ return url;
+ }
+
+ private HttpMethod? GetHttpMethod(RuleFlowContext context)
+ {
+ var method = context.Parameters.GetValueOrDefault("http_method", string.Empty);
+ var innerMethod = method?.Trim()?.ToUpper();
+ HttpMethod? matchMethod = null;
+
+ switch (innerMethod)
+ {
+ case "GET":
+ matchMethod = HttpMethod.Get;
+ break;
+ case "POST":
+ matchMethod = HttpMethod.Post;
+ break;
+ case "DELETE":
+ matchMethod = HttpMethod.Delete;
+ break;
+ case "PUT":
+ matchMethod = HttpMethod.Put;
+ break;
+ case "PATCH":
+ matchMethod = HttpMethod.Patch;
+ break;
+ default:
+ break;
+
+ }
+
+ return matchMethod;
+ }
+
+ private void AddHttpHeaders(HttpClient client, RuleFlowContext context)
+ {
+ var headerParams = context.Parameters.TryGetObjectValueOrDefault>("http_request_headers");
+ if (!headerParams.IsNullOrEmpty())
+ {
+ foreach (var header in headerParams!)
+ {
+ client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value);
+ }
+ }
+ }
+
+ private string? GetHttpRequestBody(RuleFlowContext context)
+ {
+ var body = context.Parameters.GetValueOrDefault("http_request_body");
+ return body;
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs b/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs
new file mode 100644
index 000000000..c9210c690
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Conditions/AllVisitedRuleCondition.cs
@@ -0,0 +1,32 @@
+namespace BotSharp.Core.Rules.Conditions;
+
+public class AllVisitedRuleCondition : IRuleCondition
+{
+ private readonly ILogger _logger;
+
+ public AllVisitedRuleCondition(
+ ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public string Name => "all_visited";
+
+ public async Task EvaluateAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ var currentNode = context.Node;
+ var parents = context.Graph.GetParentNodes(currentNode);
+ var parentNodeIds = parents.Select(x => x.Item1.Id).ToList();
+ var visitedNodeIds = context.PrevStepResults?.Select(x => x.Node.Id)?.ToHashSet() ?? [];
+ var allVisited = parentNodeIds.All(x => visitedNodeIds.Contains(x));
+
+ return new RuleNodeResult
+ {
+ Success = allVisited,
+ Response = allVisited ? "All parent nodes have been visited" : "Missing parenet nodes visiting."
+ };
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs b/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs
new file mode 100644
index 000000000..91f674330
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Conditions/LoopingCondition.cs
@@ -0,0 +1,167 @@
+using System.Text.Json;
+
+namespace BotSharp.Core.Rules.Conditions;
+
+///
+/// A general loop condition node that iterates over a list of items in context parameters.
+///
+/// Expected parameters:
+/// - "list_items": A JSON array of items to iterate over (e.g. ["a","b","c"] or [1,2,3] or [{...},{...}]).
+/// - "iterate_index": The current iteration index (auto-managed, starts at 0).
+/// - "iterate_current_item": The current item being processed (auto-set each iteration).
+///
+/// Flow:
+/// Action Node → LoopCondition → (true) → back to Action Node
+/// → (false) → resets list_items, iterate_index, iterate_current_item and continues
+///
+public sealed class LoopingCondition : IRuleCondition
+{
+ private const string PARAM_LIST_ITEMS = "list_items";
+ private const string PARAM_LIST_ITEMS_KEY = "list_items_key";
+ private const string PARAM_ITERATE_INDEX = "iterate_index";
+ private const string PARAM_ITERATE_ITEM_KEY = "iterate_item_key";
+ private const string PARAM_ITERATE_NEXT_ITEM = "iterate_next_item";
+
+ private readonly ILogger _logger;
+
+ public LoopingCondition(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public string Name => "looping";
+
+ public async Task EvaluateAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ try
+ {
+ context.Parameters ??= [];
+
+ var listItemsRaw = string.Empty;
+ var listItemsKey = context.Parameters.GetValueOrDefault(PARAM_LIST_ITEMS_KEY, string.Empty);
+ if (!string.IsNullOrWhiteSpace(listItemsKey))
+ {
+ listItemsRaw = context.Parameters.GetValueOrDefault(listItemsKey, string.Empty);
+ }
+ else
+ {
+ listItemsRaw = context.Parameters.GetValueOrDefault(PARAM_LIST_ITEMS, string.Empty);
+ }
+
+ if (string.IsNullOrWhiteSpace(listItemsRaw))
+ {
+ _logger.LogInformation("Loop condition: list items are empty, loop completed (agent {AgentId}).", agent.Id);
+ CleanLoopState(context);
+ return new RuleNodeResult
+ {
+ Success = false,
+ Response = "Loop completed: list_items is empty."
+ };
+ }
+
+ // Deserialize list_items as a JSON array of any type
+ var items = JsonSerializer.Deserialize(listItemsRaw, new JsonSerializerOptions
+ {
+ PropertyNameCaseInsensitive = true
+ });
+
+ if (items.IsNullOrEmpty())
+ {
+ _logger.LogInformation("Loop condition: no items to iterate, loop completed (agent {AgentId}).", agent.Id);
+ CleanLoopState(context);
+ return new RuleNodeResult
+ {
+ Success = false,
+ Response = "Loop completed: no items in list."
+ };
+ }
+
+ // If iterate_index is not yet set, this is the first visit after the action node
+ // already handled item[0], so start from index 1.
+ var indexStr = context.Parameters.GetValueOrDefault(PARAM_ITERATE_INDEX);
+ int currentIndex;
+ if (string.IsNullOrEmpty(indexStr))
+ {
+ currentIndex = 0;
+ }
+ else if (!int.TryParse(indexStr, out currentIndex))
+ {
+ currentIndex = 0;
+ }
+
+ var nextIndex = currentIndex + 1;
+ if (currentIndex >= items!.Length || nextIndex >= items!.Length)
+ {
+ _logger.LogInformation("Loop condition: iteration complete ({Index}/{Total}) (agent {AgentId}).",
+ currentIndex, items.Length, agent.Id);
+ CleanLoopState(context);
+ return new RuleNodeResult
+ {
+ Success = false,
+ Response = $"Loop completed: iterated over all {items.Length} items."
+ };
+ }
+
+ // Set next item and advance index
+ var nextElement = items[nextIndex];
+ var nextItem = nextElement.ConvertToString();
+ context.Parameters[PARAM_ITERATE_NEXT_ITEM] = nextItem;
+ context.Parameters[PARAM_ITERATE_INDEX] = nextIndex.ToString();
+
+ var data = new Dictionary
+ {
+ [PARAM_ITERATE_NEXT_ITEM] = nextItem,
+ [PARAM_ITERATE_INDEX] = nextIndex.ToString()
+ };
+
+
+ var itemKey = context.Parameters.GetValueOrDefault(PARAM_ITERATE_ITEM_KEY);
+ if (!string.IsNullOrEmpty(itemKey)
+ && nextElement.ValueKind == JsonValueKind.Object
+ && nextElement.TryGetProperty(itemKey, out var fieldValue))
+ {
+ var fieldStr = fieldValue.ToString();
+ context.Parameters[itemKey] = fieldStr;
+ data[itemKey] = fieldStr;
+ }
+
+ _logger.LogInformation("Loop condition: processing item {Index}/{Total} = '{Item}' (agent {AgentId}).",
+ nextItem, items.Length, nextElement, agent.Id);
+
+ return new RuleNodeResult
+ {
+ Success = true,
+ Response = $"Loop iteration {nextIndex}/{items.Length}: next item = {nextItem}",
+ Data = data
+ };
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error evaluating loop condition for agent {AgentId}", agent.Id);
+ CleanLoopState(context);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+
+ private void CleanLoopState(RuleFlowContext context)
+ {
+ var itemKey = context.Parameters?.GetValueOrDefault(PARAM_ITERATE_ITEM_KEY);
+
+ context.Parameters?.Remove(PARAM_LIST_ITEMS);
+ context.Parameters?.Remove(PARAM_ITERATE_INDEX);
+ context.Parameters?.Remove(PARAM_ITERATE_ITEM_KEY);
+ context.Parameters?.Remove(PARAM_ITERATE_NEXT_ITEM);
+
+ if (!string.IsNullOrEmpty(itemKey))
+ {
+ context.Parameters?.Remove(itemKey);
+ }
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
new file mode 100644
index 000000000..9598df345
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
@@ -0,0 +1,17 @@
+namespace BotSharp.Core.Rules.Constants;
+
+public static class RuleConstant
+{
+ public const int MAX_GRAPH_RECURSION = 50;
+
+ public static IEnumerable CONDITION_NODE_TYPES = new List
+ {
+ "condition",
+ "criteria"
+ };
+
+ public static IEnumerable ACTION_NODE_TYPES = new List
+ {
+ "action"
+ };
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
new file mode 100644
index 000000000..feb314ef7
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
@@ -0,0 +1,42 @@
+using BotSharp.Core.Rules.Models;
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Core.Rules.Controllers;
+
+[Authorize]
+[ApiController]
+public class RuleController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IRuleEngine _ruleEngine;
+
+ public RuleController(
+ IServiceProvider services,
+ ILogger logger,
+ IRuleEngine ruleEngine)
+ {
+ _services = services;
+ _logger = logger;
+ _ruleEngine = ruleEngine;
+ }
+
+ [HttpPost("/rule/trigger/action")]
+ public async Task RunAction([FromBody] RuleTriggerActionRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request cannnot be empty." });
+ }
+
+ var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName));
+ if (trigger == null)
+ {
+ return BadRequest(new { Success = false, Error = "Unable to find rule trigger." });
+ }
+
+ var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options);
+ return Ok(new { Success = true });
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs
new file mode 100644
index 000000000..70a9eac22
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs
@@ -0,0 +1,10 @@
+namespace BotSharp.Core.Rules;
+
+public class DemoRuleTrigger : IRuleTrigger
+{
+ public string Channel => "crontab";
+ public string Name => nameof(DemoRuleTrigger);
+
+ public string EntityType { get; set; } = "DemoType";
+ public string EntityId { get; set; } = "DemoId";
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
index 5f68b722d..8b5d846a9 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
@@ -1,36 +1,16 @@
-using BotSharp.Abstraction.Agents.Models;
-using BotSharp.Abstraction.Coding;
-using BotSharp.Abstraction.Coding.Contexts;
-using BotSharp.Abstraction.Coding.Enums;
-using BotSharp.Abstraction.Coding.Models;
-using BotSharp.Abstraction.Coding.Settings;
-using BotSharp.Abstraction.Coding.Utils;
-using BotSharp.Abstraction.Conversations;
-using BotSharp.Abstraction.Hooks;
-using BotSharp.Abstraction.Models;
-using BotSharp.Abstraction.Repositories.Filters;
-using BotSharp.Abstraction.Rules.Options;
-using BotSharp.Abstraction.Utilities;
-using Microsoft.Extensions.Logging;
-using System.Data;
-using System.Text.Json;
-
namespace BotSharp.Core.Rules.Engines;
public class RuleEngine : IRuleEngine
{
private readonly IServiceProvider _services;
private readonly ILogger _logger;
- private readonly CodingSettings _codingSettings;
public RuleEngine(
IServiceProvider services,
- ILogger logger,
- CodingSettings codingSettings)
+ ILogger logger)
{
_services = services;
_logger = logger;
- _codingSettings = codingSettings;
}
public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null)
@@ -39,7 +19,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te
// Pull all user defined rules
var agentService = _services.GetRequiredService();
- var agents = await agentService.GetAgents(new AgentFilter
+ var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter
{
Pager = new Pagination
{
@@ -51,154 +31,623 @@ public async Task> Triggered(IRuleTrigger trigger, string te
var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList();
foreach (var agent in filteredAgents)
{
- // Code trigger
- if (options != null)
+ var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled);
+ if (rule == null)
{
- var isTriggered = await TriggerCodeScript(agent, trigger.Name, options);
- if (!isTriggered)
+ continue;
+ }
+
+ var ruleConfig = rule.Config;
+ var ruleFlowTopologyName = options?.Flow?.TopologyName ?? ruleConfig?.TopologyName;
+
+ if (!string.IsNullOrEmpty(ruleFlowTopologyName))
+ {
+ // Execute graph
+ // 1. Load graph
+ var graph = await LoadGraph(ruleFlowTopologyName, agent, trigger, options?.Flow);
+ if (graph == null)
{
continue;
}
- }
- var convService = _services.GetRequiredService();
- var conv = await convService.NewConversation(new Conversation
+ // 2. Get root node
+ var param = options?.Flow?.Parameters;
+ var rootNodeName = param != null ? param.GetValueOrDefault("root_node_name")?.ToString() : null;
+ var root = graph.GetRootNode(rootNodeName);
+ if (root == null)
+ {
+ graph.Clear();
+ continue;
+ }
+
+ // 3. Execute graph
+ var execResults = new List();
+ await ExecuteGraphNode(root, graph, agent, trigger, text, states, null, options, execResults);
+ graph.Clear();
+
+ // Get conversation id to support legacy features
+ var convIds = execResults.Where(x => x.Success && x.Data.TryGetValue("conversation_id", out _))
+ .Select(x => x.Data.GetValueOrDefault("conversation_id", string.Empty))
+ .Where(x => !string.IsNullOrEmpty(x))
+ .ToList();
+
+ newConversationIds.AddRange(convIds);
+ }
+ else
{
- Channel = trigger.Channel,
- Title = text,
- AgentId = agent.Id
- });
+ var convId = await SendMessageToAgent(agent, trigger, text, states);
+ newConversationIds.Add(convId);
+ }
+ }
+
+ return newConversationIds;
+ }
+
+ public async Task ExecuteGraphNode(RuleNode node, RuleGraph graph, string agentId, IRuleTrigger trigger, RuleNodeExecutionOptions options)
+ {
+ if (node == null || graph == null || options == null)
+ {
+ return;
+ }
+
+ var agentService = _services.GetRequiredService();
+ var agent = await agentService.GetAgent(agentId);
+
+ var triggerOptions = new RuleTriggerOptions
+ {
+ Flow = options.Flow,
+ JsonOptions = options.JsonOptions
+ };
- var message = new RoleDialogModel(AgentRole.User, text);
+ var execResults = new List();
+ await ExecuteGraphNode(
+ node, graph,
+ agent, trigger,
+ options.Text,
+ options.States,
+ null,
+ triggerOptions,
+ execResults);
+ graph.Clear();
+ }
- var allStates = new List
+ #region Graph
+ private async Task LoadGraph(string name, Agent agent, IRuleTrigger trigger, RuleFlowOptions? options)
+ {
+ var flow = _services.GetServices>().FirstOrDefault(x => x.Name.IsEqualTo(name));
+ if (flow == null)
+ {
+ return null;
+ }
+
+ try
+ {
+ var config = await flow.GetTopologyConfigAsync(options: new()
{
- new("channel", trigger.Channel)
- };
+ TopologyName = name
+ });
- if (states != null)
+ var topologyId = config?.TopologyId;
+ if (string.IsNullOrEmpty(topologyId))
{
- allStates.AddRange(states);
+ return null;
}
- await convService.SetConversationId(conv.Id, allStates);
- await convService.SendMessage(agent.Id,
- message,
- null,
- msg => Task.CompletedTask);
+ var param = new Dictionary(options?.Parameters ?? []);
+ param["agent"] = param.GetValueOrDefault("agent", agent.Name);
+ param["agent_id"] = param.GetValueOrDefault("agent_id", agent.Id);
+ param["trigger"] = param.GetValueOrDefault("trigger", trigger.Name);
- await convService.SaveStates();
- newConversationIds.Add(conv.Id);
+ return await flow.GetTopologyAsync(topologyId, options: new()
+ {
+ Query = options?.Query,
+ Parameters = param
+ });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when loading graph (name: {name}, agent: {agent}, trigger: {trigger?.Name})");
+ return null;
}
+ }
- return newConversationIds;
+ private async Task ExecuteGraphNode(
+ RuleNode node,
+ RuleGraph graph,
+ Agent agent,
+ IRuleTrigger trigger,
+ string text,
+ IEnumerable? states,
+ Dictionary? data,
+ RuleTriggerOptions? options,
+ List results)
+ {
+ try
+ {
+ if (options?.Flow?.TraversalAlgorithm?.IsEqualTo("bfs") == true)
+ {
+ await ExecuteGraphNodeBfs(node, graph, agent, trigger, text, states, data, options, results);
+ }
+ else
+ {
+ await ExecuteGraphNodeDfs(node, graph, agent, trigger, text, states, data, options, results);
+ }
+ }
+ catch { }
}
- #region Private methods
- private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options)
+ private async Task ExecuteGraphNodeDfs(
+ RuleNode node,
+ RuleGraph graph,
+ Agent agent,
+ IRuleTrigger trigger,
+ string text,
+ IEnumerable? states,
+ Dictionary? data,
+ RuleTriggerOptions? options,
+ List results)
{
- if (string.IsNullOrWhiteSpace(agent?.Id))
+ // Check whether the action nodes have been visited more than limit
+ var visited = results.Count();
+ var param = options?.Flow?.Parameters ?? [];
+ var maxRecursion = int.TryParse(param.GetValueOrDefault("max_recursion")?.ToString(), out var depth) && depth > 0
+ ? depth : RuleConstant.MAX_GRAPH_RECURSION;
+
+ var innerData = new Dictionary(data ?? []);
+
+ if (visited >= maxRecursion)
{
- return false;
+ _logger.LogWarning("Exceed max graph recursion {MaxRecursion} (agent {Agent} and trigger {Trigger}).",
+ maxRecursion, agent.Name, trigger.Name);
+ return;
}
- var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter;
- var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider));
- if (processor == null)
+ // Get current node successors
+ var nextNodes = graph.GetChildrenNodes(node);
+ if (nextNodes.IsNullOrEmpty())
{
- _logger.LogWarning($"Unable to find code processor: {provider}.");
- return false;
+ return;
}
- var agentService = _services.GetRequiredService();
- var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py";
- var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src);
+ // Visit neighbor nodes
+ foreach (var (nextNode, edge) in nextNodes)
+ {
+ // Build context
+ var context = new RuleFlowContext
+ {
+ Node = nextNode,
+ Edge = edge,
+ Graph = graph,
+ Text = text,
+ Parameters = BuildParameters(nextNode.Config, states, innerData),
+ PrevStepResults = results,
+ JsonOptions = options?.JsonOptions
+ };
+
+ if (RuleConstant.CONDITION_NODE_TYPES.Contains(nextNode.Type))
+ {
+ // Execute condition node
+ var conditionResult = await ExecuteCondition(nextNode, graph, agent, trigger, context);
+ if (conditionResult == null)
+ {
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = false,
+ ErrorMessage = $"Unable to find condition {nextNode.Name}."
+ }, nextNode));
+ continue;
+ }
+
+ results.Add(RuleFlowStepResult.FromResult(conditionResult, nextNode));
+
+ // If condition result is true, then execute the next node, otherwise skip
+ if (conditionResult.Success)
+ {
+ await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results);
+ }
+ else
+ {
+ _logger.LogInformation("Condition {ConditionName} evaluated to false, skipping next node (agent {Agent} and trigger {Trigger}).",
+ nextNode.Name, agent.Name, trigger.Name);
+ }
+ }
+ else if (RuleConstant.ACTION_NODE_TYPES.Contains(nextNode.Type))
+ {
+ // Execute action node
+ var actionResult = await ExecuteAction(nextNode, graph, agent, trigger, context);
+ if (actionResult == null)
+ {
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = false,
+ ErrorMessage = $"Unable to find action {nextNode.Name}."
+ }, nextNode));
+ continue;
+ }
+
+ results.Add(RuleFlowStepResult.FromResult(actionResult, nextNode));
- var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}.";
+ if (actionResult.IsDelayed)
+ {
+ continue;
+ }
- if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content))
+ await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results);
+ }
+ else
+ {
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = true,
+ Response = $"Pass through node {nextNode.Name}."
+ }, nextNode));
+ await ExecuteGraphNodeDfs(nextNode, graph, agent, trigger, text, states, context.Parameters, options, results);
+ }
+ }
+ }
+
+ private async Task ExecuteGraphNodeBfs(
+ RuleNode root,
+ RuleGraph graph,
+ Agent agent,
+ IRuleTrigger trigger,
+ string text,
+ IEnumerable? states,
+ Dictionary? data,
+ RuleTriggerOptions? options,
+ List results)
+ {
+ var param = options?.Flow?.Parameters ?? [];
+ var maxRecursion = int.TryParse(param.GetValueOrDefault("max_recursion")?.ToString(), out var depth) && depth > 0
+ ? depth : RuleConstant.MAX_GRAPH_RECURSION;
+
+ var innerData = new Dictionary(data ?? []);
+
+ // Each queue entry is (node-to-process, edge-that-leads-to-it)
+ var queue = new Queue<(RuleNode Node, RuleEdge Edge)>();
+
+ foreach (var (childNode, edge) in graph.GetChildrenNodes(root))
{
- _logger.LogWarning($"Unable to find {msg}.");
- return false;
+ queue.Enqueue((childNode, edge));
}
- try
+ while (queue.Count > 0)
{
- var hooks = _services.GetHooks(agent.Id);
+ if (results.Count >= maxRecursion)
+ {
+ _logger.LogWarning("Exceed max graph nodes {MaxNodes} during BFS (agent {Agent} and trigger {Trigger}).",
+ maxRecursion, agent.Name, trigger.Name);
+ break;
+ }
- var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent);
- var context = new CodeExecutionContext
+ var (nextNode, nextEdge) = queue.Dequeue();
+
+ var context = new RuleFlowContext
{
- CodeScript = codeScript,
- Arguments = arguments
+ Node = nextNode,
+ Edge = nextEdge,
+ Graph = graph,
+ Text = text,
+ Parameters = BuildParameters(nextNode.Config, states, innerData),
+ PrevStepResults = results,
+ JsonOptions = options?.JsonOptions
};
- foreach (var hook in hooks)
+ if (RuleConstant.CONDITION_NODE_TYPES.Contains(nextNode.Type))
+ {
+ // Execute condition node
+ var conditionResult = await ExecuteCondition(nextNode, graph, agent, trigger, context);
+ innerData = new(context.Parameters ?? []);
+
+ if (conditionResult == null)
+ {
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = false,
+ ErrorMessage = $"Unable to find condition {nextNode.Name}."
+ }, nextNode));
+ continue;
+ }
+
+ results.Add(RuleFlowStepResult.FromResult(conditionResult, nextNode));
+
+ // If condition is true, enqueue children; otherwise skip the branch
+ if (conditionResult.Success)
+ {
+ foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode))
+ {
+ queue.Enqueue((childNode, childEdge));
+ }
+ }
+ else
+ {
+ _logger.LogInformation("Condition {ConditionName} evaluated to false, skipping next node (agent {Agent} and trigger {Trigger}).",
+ nextNode.Name, agent.Name, trigger.Name);
+ }
+ }
+ else if (RuleConstant.ACTION_NODE_TYPES.Contains(nextNode.Type))
+ {
+ // Execute action node
+ var actionResult = await ExecuteAction(nextNode, graph, agent, trigger, context);
+ innerData = new(context.Parameters ?? []);
+
+ if (actionResult == null)
+ {
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = false,
+ ErrorMessage = $"Unable to find action {nextNode.Name}."
+ }, nextNode));
+ continue;
+ }
+
+ results.Add(RuleFlowStepResult.FromResult(actionResult, nextNode));
+
+ if (!actionResult.IsDelayed)
+ {
+ foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode))
+ {
+ queue.Enqueue((childNode, childEdge));
+ }
+ }
+ }
+ else
{
- await hook.BeforeCodeExecution(agent, context);
+ results.Add(RuleFlowStepResult.FromResult(new()
+ {
+ Success = true,
+ Response = $"Pass through node {nextNode.Name}."
+ }, nextNode));
+
+ foreach (var (childNode, childEdge) in graph.GetChildrenNodes(nextNode))
+ {
+ queue.Enqueue((childNode, childEdge));
+ }
}
+ }
+ }
+ #endregion
- var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings);
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
- var response = processor.Run(codeScript.Content, options: new()
+
+ #region Action
+ private async Task ExecuteAction(
+ RuleNode node,
+ RuleGraph graph,
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ try
+ {
+ // Find the matching action
+ var foundAction = GetRuleAction(node, agent, trigger);
+ if (foundAction == null)
{
- ScriptName = scriptName,
- Arguments = arguments,
- UseLock = useLock,
- UseProcess = useProcess
- }, cancellationToken: cts.Token);
+ var errorMsg = $"No rule action {node?.Name} is found";
+ _logger.LogWarning(errorMsg);
+ return null;
+ }
- var codeResponse = new CodeExecutionResponseModel
+ _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}",
+ foundAction.Name, agent.Id, trigger.Name);
+
+ var hooks = _services.GetHooks(agent.Id);
+ foreach (var hook in hooks)
{
- CodeProcessor = processor.Provider,
- CodeScript = codeScript,
- Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty),
- ExecutionResult = response
- };
+ await hook.BeforeRuleActionExecuting(agent, node, trigger, context);
+ }
+
+ // Execute action
+ context.Parameters ??= [];
+ var result = await foundAction.ExecuteAsync(agent, trigger, context);
foreach (var hook in hooks)
{
- await hook.AfterCodeExecution(agent, codeResponse);
+ await hook.AfterRuleActionExecuted(agent, node, trigger, context, result);
}
- if (response == null || !response.Success)
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", node?.Name, agent.Id);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+
+ // Find the matching action
+ private IRuleAction? GetRuleAction(RuleNode node, Agent agent, IRuleTrigger trigger)
+ {
+ var actions = _services.GetServices()
+ .Where(x => x.Name.IsEqualTo(node?.Name))
+ .ToList();
+
+ var found = actions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id) && x.Triggers?.Contains(trigger.Name) == true);
+ if (found != null)
+ {
+ return found;
+ }
+
+ found = actions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id));
+ if (found != null)
+ {
+ return found;
+ }
+
+ found = actions.FirstOrDefault(x => x.Triggers?.Contains(trigger.Name, StringComparer.OrdinalIgnoreCase) == true);
+ if (found != null)
+ {
+ return found;
+ }
+
+ found = actions.FirstOrDefault();
+ if (found != null)
+ {
+ return found;
+ }
+
+ return null;
+ }
+ #endregion
+
+
+ #region Condition
+ private async Task ExecuteCondition(
+ RuleNode node,
+ RuleGraph graph,
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleFlowContext context)
+ {
+ try
+ {
+ // Find the matching condition
+ var foundCondition = GetRuleCondition(node, agent, trigger);
+ if (foundCondition == null)
{
- _logger.LogWarning($"Failed to handle {msg}");
- return false;
+ var errorMsg = $"No rule condition {node?.Name} is found";
+ _logger.LogWarning(errorMsg);
+ return null;
}
- bool result;
- LogLevel logLevel;
- if (response.Result.IsEqualTo("true"))
+ _logger.LogInformation("Start execution rule condition {ConditionName} for agent {AgentId} with trigger {TriggerName}",
+ foundCondition.Name, agent.Id, trigger.Name);
+
+ var hooks = _services.GetHooks(agent.Id);
+ foreach (var hook in hooks)
{
- logLevel = LogLevel.Information;
- result = true;
+ await hook.BeforeRuleConditionExecuting(agent, node, trigger, context);
}
- else
+
+ // Execute condition
+ context.Parameters ??= [];
+ var result = await foundCondition.EvaluateAsync(agent, trigger, context);
+
+ foreach (var hook in hooks)
{
- logLevel = LogLevel.Warning;
- result = false;
+ await hook.AfterRuleConditionExecuted(agent, node, trigger, context, result);
}
- _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}");
return result;
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error when handling {msg}");
- return false;
+ _logger.LogError(ex, "Error executing rule condition {ConditionName} for agent {AgentId}", node?.Name, agent.Id);
+ return new RuleNodeResult
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+
+ // Find the matching condition
+ private IRuleCondition? GetRuleCondition(RuleNode node, Agent agent, IRuleTrigger trigger)
+ {
+ var conditions = _services.GetServices()
+ .Where(x => x.Name.IsEqualTo(node?.Name))
+ .ToList();
+
+ var found = conditions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id) && x.Triggers?.Contains(trigger.Name) == true);
+ if (found != null)
+ {
+ return found;
}
+
+ found = conditions.FirstOrDefault(x => !string.IsNullOrEmpty(x.AgentId) && x.AgentId.IsEqualTo(agent.Id));
+ if (found != null)
+ {
+ return found;
+ }
+
+ found = conditions.FirstOrDefault(x => x.Triggers?.Contains(trigger.Name, StringComparer.OrdinalIgnoreCase) == true);
+ if (found != null)
+ {
+ return found;
+ }
+
+ found = conditions.FirstOrDefault();
+ if (found != null)
+ {
+ return found;
+ }
+
+ return null;
}
+ #endregion
- private List BuildArguments(string? name, JsonDocument? args)
+
+ #region Private methods
+ private Dictionary BuildParameters(
+ Dictionary? config,
+ IEnumerable? states,
+ Dictionary? param = null)
{
- var keyValues = new List();
- if (args != null)
+ var dict = new Dictionary();
+
+ if (config != null)
{
- keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText()));
+ dict = new(config);
}
- return keyValues;
+
+ if (!states.IsNullOrEmpty())
+ {
+ foreach (var state in states!)
+ {
+ dict[state.Key] = state.Value?.ConvertToString();
+ }
+ }
+
+ if (!param.IsNullOrEmpty())
+ {
+ foreach (var pair in param!)
+ {
+ dict[pair.Key] = pair.Value;
+ }
+ }
+
+ return dict;
+ }
+ #endregion
+
+
+ #region Legacy conversation
+ private async Task SendMessageToAgent(Agent agent, IRuleTrigger trigger, string text, IEnumerable? states = null)
+ {
+ var convService = _services.GetRequiredService();
+ var conv = await convService.NewConversation(new Conversation
+ {
+ Channel = trigger.Channel,
+ Title = text,
+ AgentId = agent.Id
+ });
+
+ var message = new RoleDialogModel(AgentRole.User, text);
+
+ var allStates = new List
+ {
+ new("channel", trigger.Channel)
+ };
+
+ if (!states.IsNullOrEmpty())
+ {
+ allStates.AddRange(states!);
+ }
+
+ await convService.SetConversationId(conv.Id, allStates);
+ await convService.SendMessage(agent.Id,
+ message,
+ null,
+ msg => Task.CompletedTask);
+
+ await convService.SaveStates();
+
+ return conv.Id;
}
#endregion
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
new file mode 100644
index 000000000..1c5e81d71
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
@@ -0,0 +1,11 @@
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleMessagePayload
+{
+ public string AgentId { get; set; }
+ public string TriggerName { get; set; }
+ public string Channel { get; set; }
+ public string Text { get; set; }
+ public Dictionary States { get; set; }
+ public DateTime Timestamp { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
new file mode 100644
index 000000000..0abea08b0
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
@@ -0,0 +1,18 @@
+using System.Text.Json.Serialization;
+
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleTriggerActionRequest
+{
+ [JsonPropertyName("trigger_name")]
+ public string TriggerName { get; set; }
+
+ [JsonPropertyName("text")]
+ public string Text { get; set; } = string.Empty;
+
+ [JsonPropertyName("states")]
+ public IEnumerable? States { get; set; }
+
+ [JsonPropertyName("options")]
+ public RuleTriggerOptions? Options { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
index 56e1fb8ae..ace6553ae 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
@@ -1,3 +1,5 @@
+using BotSharp.Core.Rules.Actions;
+using BotSharp.Core.Rules.Conditions;
using BotSharp.Core.Rules.Engines;
namespace BotSharp.Core.Rules;
@@ -16,6 +18,21 @@ public class RulesPlugin : IBotSharpPlugin
public void RegisterDI(IServiceCollection services, IConfiguration config)
{
+ // Register rule engine
services.AddScoped();
+
+ // Register rule actions
+ services.AddScoped();
+ services.AddScoped();
+ services.AddScoped();
+
+#if DEBUG
+ // Register rule conditions
+ services.AddScoped();
+ services.AddScoped();
+
+ // Register rule trigger
+ services.AddScoped();
+#endif
}
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
index a4353c960..2d1dc6844 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
@@ -1,5 +1,7 @@
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using System.Text;
global using BotSharp.Abstraction.Agents.Enums;
global using BotSharp.Abstraction.Plugins;
@@ -8,4 +10,24 @@
global using BotSharp.Abstraction.Instructs;
global using BotSharp.Abstraction.Instructs.Models;
-global using BotSharp.Abstraction.Rules;
\ No newline at end of file
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Conversations;
+
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Repositories.Filters;
+global using BotSharp.Abstraction.Rules;
+global using BotSharp.Abstraction.Rules.Options;
+global using BotSharp.Abstraction.Rules.Models;
+global using BotSharp.Abstraction.Rules.Hooks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Coding;
+global using BotSharp.Abstraction.Coding.Contexts;
+global using BotSharp.Abstraction.Coding.Enums;
+global using BotSharp.Abstraction.Coding.Models;
+global using BotSharp.Abstraction.Coding.Utils;
+global using BotSharp.Abstraction.Coding.Settings;
+global using BotSharp.Abstraction.Hooks;
+
+global using BotSharp.Core.Rules.Constants;
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
index f0ec11c23..6995b49ec 100644
--- a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
+++ b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
@@ -195,7 +195,7 @@ public async Task Execute(
foreach (var hook in hooks)
{
await hook.AfterCompletion(agent, instructResult);
- await hook.AfterCodeExecution(agent, codeExecution);
+ await hook.AfterCodeExecution(agent, context, codeExecution);
}
return instructResult;
diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
new file mode 100644
index 000000000..5c84fcb63
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
@@ -0,0 +1,18 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Core.Messaging;
+
+public class MessagingPlugin : IBotSharpPlugin
+{
+ public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b";
+ public string Name => "Messaging";
+ public string Description => "Provides message queue services.";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+ services.AddSingleton(mqSettings);
+ }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json
index e958bcb31..f09d98554 100644
--- a/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json
+++ b/src/Infrastructure/BotSharp.Core/data/agents/01e2fc5c-2c89-4ec7-8470-7688608b496c/agent.json
@@ -14,5 +14,10 @@
"model": "gpt-5-mini",
"max_recursion_depth": 3,
"reasoning_effort_level": "minimal"
- }
+ },
+ "rules": [
+ {
+ "trigger_name": "DemoRuleTrigger"
+ }
+ ]
}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
index 6ef8c5935..117feaebe 100644
--- a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
+++ b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
@@ -1,9 +1,11 @@
+using BotSharp.Abstraction.Coding.Contexts;
using BotSharp.Abstraction.Coding.Models;
using BotSharp.Abstraction.Instructs.Models;
using BotSharp.Abstraction.Instructs.Settings;
using BotSharp.Abstraction.Loggers.Models;
using BotSharp.Abstraction.Users;
using BotSharp.Abstraction.Utilities;
+using System;
namespace BotSharp.Logger.Hooks;
@@ -61,7 +63,7 @@ await db.SaveInstructionLogs(new List
await base.OnResponseGenerated(response);
}
- public override async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response)
+ public override async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response)
{
if (response == null || !IsLoggingEnabled(agent?.Id))
{
@@ -88,7 +90,7 @@ await db.SaveInstructionLogs(new List
}
});
- await base.AfterCodeExecution(agent, response);
+ await base.AfterCodeExecution(agent, context, response);
}
private bool IsLoggingEnabled(string? agentId)
diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
index 52fd719fd..43cf228c4 100644
--- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
+++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
@@ -1,5 +1,5 @@
-using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Rules;
+using BotSharp.Abstraction.Rules.Models;
namespace BotSharp.OpenAPI.Controllers;
@@ -18,9 +18,22 @@ public IEnumerable GetRuleTriggers()
}).OrderBy(x => x.TriggerName);
}
- [HttpGet("/rule/formalization")]
- public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule)
+ [HttpGet("/rule/config/options")]
+ public async Task> GetRuleConfigOptions()
{
- return "{}";
+ var dict = new Dictionary();
+ var flows = _services.GetServices>();
+
+ foreach (var flow in flows)
+ {
+ var config = await flow.GetTopologyConfigAsync();
+ if (string.IsNullOrEmpty(config.TopologyName))
+ {
+ continue;
+ }
+ dict[config.TopologyName] = config;
+ }
+
+ return dict;
}
}
diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
index be189898e..8e29bb1bb 100644
--- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
+++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
@@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url}).");
+ _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url}).");
return result;
}
}
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs b/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs
index dbc7393a8..52f3968dd 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Controllers/MembaseController.cs
@@ -1,4 +1,5 @@
using BotSharp.Abstraction.Graph;
+using BotSharp.Plugin.Membase.Interfaces;
using Microsoft.AspNetCore.Http;
namespace BotSharp.Plugin.Membase.Controllers;
@@ -8,11 +9,46 @@ namespace BotSharp.Plugin.Membase.Controllers;
public class MembaseController : ControllerBase
{
private readonly IServiceProvider _services;
+ private readonly IMembaseApi _membaseApi;
public MembaseController(
- IServiceProvider services)
+ IServiceProvider services,
+ IMembaseApi membaseApi)
{
_services = services;
+ _membaseApi = membaseApi;
+ }
+
+ ///
+ /// Get graph information
+ ///
+ /// The graph identifier
+ /// Graph information
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpGet("/membase/{graphId}")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task GetGraphInfo(string graphId)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ try
+ {
+ var graphInfo = await _membaseApi.GetGraphInfoAsync(graphId);
+ return Ok(graphInfo);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while retrieving graph information.", error = ex.Message });
+ }
}
///
@@ -21,6 +57,9 @@ public MembaseController(
/// The graph identifier
/// The Cypher query request containing the query and parameters
/// Query result with columns and data
+#if DEBUG
+ [AllowAnonymous]
+#endif
[HttpPost("/membase/{graphId}/query")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
@@ -58,4 +97,323 @@ public async Task ExecuteGraphQuery(string graphId, [FromBody] Cy
new { message = "An error occurred while executing the query.", error = ex.Message });
}
}
+
+ ///
+ /// Get a node from the graph
+ ///
+ /// The graph identifier
+ /// The node identifier
+ /// The node
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpGet("/membase/{graphId}/node/{nodeId}")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task GetNode(string graphId, string nodeId)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(nodeId))
+ {
+ return BadRequest("Node ID cannot be empty.");
+ }
+
+ try
+ {
+ var node = await _membaseApi.GetNodeAsync(graphId, nodeId);
+ return Ok(node);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while retrieving the node.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Create a node in the graph
+ ///
+ /// The graph identifier
+ /// The node creation model
+ /// The created node
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpPost("/membase/{graphId}/node")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task CreateNode(string graphId, [FromBody] NodeCreationModel request)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (request == null)
+ {
+ return BadRequest("Node creation model cannot be null.");
+ }
+
+ try
+ {
+ var node = await _membaseApi.CreateNodeAsync(graphId, request);
+ return Ok(node);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while creating the node.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Merge a node in the graph
+ ///
+ /// The graph identifier
+ /// The node update model
+ /// The merged node
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpPut("/membase/{graphId}/node/merge")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task MergeNode(string graphId, [FromBody] NodeUpdateModel request)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(request?.Id))
+ {
+ return BadRequest("Node ID cannot be empty.");
+ }
+
+ try
+ {
+ var node = await _membaseApi.MergeNodeAsync(graphId, request.Id, request);
+ return Ok(node);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while merging the node.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Delete a node from the graph
+ ///
+ /// The graph identifier
+ /// The node identifier
+ /// Delete response
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpDelete("/membase/{graphId}/node/{nodeId}")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task DeleteNode(string graphId, string nodeId)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(nodeId))
+ {
+ return BadRequest("Node ID cannot be empty.");
+ }
+
+ try
+ {
+ await _membaseApi.DeleteNodeAsync(graphId, nodeId);
+ return Ok("done");
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while deleting the node.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Get an edge from the graph
+ ///
+ /// The graph identifier
+ /// The edge identifier
+ /// The edge
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpGet("/membase/{graphId}/edge/{edgeId}")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task GetEdge(string graphId, string edgeId)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(edgeId))
+ {
+ return BadRequest("Edge ID cannot be empty.");
+ }
+
+ try
+ {
+ var edge = await _membaseApi.GetEdgeAsync(graphId, edgeId);
+ return Ok(edge);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while retrieving the edge.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Create an edge in the graph
+ ///
+ /// The graph identifier
+ /// The edge creation model
+ /// The created edge
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpPost("/membase/{graphId}/edge")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task CreateEdge(string graphId, [FromBody] EdgeCreationModel request)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (request == null)
+ {
+ return BadRequest("Edge creation model cannot be null.");
+ }
+
+ if (string.IsNullOrWhiteSpace(request.SourceNodeId))
+ {
+ return BadRequest("Source node ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(request.TargetNodeId))
+ {
+ return BadRequest("Target node ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(request.Type))
+ {
+ return BadRequest("Edge type cannot be empty.");
+ }
+
+ try
+ {
+ var edge = await _membaseApi.CreateEdgeAsync(graphId, request);
+ return Ok(edge);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while creating the edge.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Update an edge in the graph
+ ///
+ /// The graph identifier
+ /// The edge update model
+ /// The updated edge
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpPut("/membase/{graphId}/edge")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task UpdateEdge(string graphId, [FromBody] EdgeUpdateModel request)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(request?.Id))
+ {
+ return BadRequest("Edge ID cannot be empty.");
+ }
+
+ try
+ {
+ var edge = await _membaseApi.UpdateEdgeAsync(graphId, request.Id, request);
+ return Ok(edge);
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while updating the edge.", error = ex.Message });
+ }
+ }
+
+ ///
+ /// Delete an edge from the graph
+ ///
+ /// The graph identifier
+ /// The edge identifier
+ /// Delete response
+#if DEBUG
+ [AllowAnonymous]
+#endif
+ [HttpDelete("/membase/{graphId}/edge/{edgeId}")]
+ [ProducesResponseType(StatusCodes.Status200OK)]
+ [ProducesResponseType(StatusCodes.Status400BadRequest)]
+ [ProducesResponseType(StatusCodes.Status500InternalServerError)]
+ public async Task DeleteEdge(string graphId, string edgeId)
+ {
+ if (string.IsNullOrWhiteSpace(graphId))
+ {
+ return BadRequest("Graph ID cannot be empty.");
+ }
+
+ if (string.IsNullOrWhiteSpace(edgeId))
+ {
+ return BadRequest("Edge ID cannot be empty.");
+ }
+
+ try
+ {
+ await _membaseApi.DeleteEdgeAsync(graphId, edgeId);
+ return Ok("done");
+ }
+ catch (Exception ex)
+ {
+ return StatusCode(
+ StatusCodes.Status500InternalServerError,
+ new { message = "An error occurred while deleting the edge.", error = ex.Message });
+ }
+ }
}
diff --git a/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs b/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs
index 1bcca821d..30209eba5 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/GraphDb/MembaseGraphDb.cs
@@ -4,6 +4,7 @@
using BotSharp.Abstraction.Models;
using BotSharp.Abstraction.Options;
using BotSharp.Abstraction.Utilities;
+using BotSharp.Plugin.Membase.Interfaces;
using Microsoft.Extensions.Logging;
using System.Text.Json;
@@ -38,7 +39,7 @@ public async Task ExecuteQueryAsync(string query, GraphQueryEx
try
{
- var response = await _membaseApi.CypherQueryAsync(options.GraphId, new CypherQueryRequest
+ var response = await _membaseApi.CypherQueryAsync(options!.GraphId, new CypherQueryRequest
{
Query = query,
Parameters = args
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs b/src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs
similarity index 95%
rename from src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs
rename to src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs
index 7dea503c8..c7b511f59 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseAuthHandler.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Handlers/MembaseAuthHandler.cs
@@ -2,7 +2,7 @@
using System.Net.Http;
using System.Threading;
-namespace BotSharp.Plugin.Membase.Services;
+namespace BotSharp.Plugin.Membase.Handlers;
public class MembaseAuthHandler : DelegatingHandler
{
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs b/src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs
similarity index 87%
rename from src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs
rename to src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs
index 85c0b3b17..1c294060a 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Services/IMembaseApi.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Interfaces/IMembaseApi.cs
@@ -1,7 +1,7 @@
using BotSharp.Plugin.Membase.Models.Graph;
using Refit;
-namespace BotSharp.Plugin.Membase.Services;
+namespace BotSharp.Plugin.Membase.Interfaces;
///
/// Membase REST API interface
@@ -29,7 +29,7 @@ public interface IMembaseApi
Task MergeNodeAsync(string graphId, string nodeId, [Body] NodeUpdateModel node);
[Delete("/graph/{graphId}/node/{nodeId}")]
- Task DeleteNodeAsync(string graphId, string nodeId);
+ Task DeleteNodeAsync(string graphId, string nodeId);
#endregion
#region Edge
@@ -43,6 +43,6 @@ public interface IMembaseApi
Task UpdateEdgeAsync(string graphId, string edgeId, [Body] EdgeUpdateModel edge);
[Delete("/graph/{graphId}/edge/{edgeId}")]
- Task DeleteEdgeAsync(string graphId, string edgeId);
+ Task DeleteEdgeAsync(string graphId, string edgeId);
#endregion
}
diff --git a/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs b/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs
index 32e06d2c7..87c6156db 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/MembasePlugin.cs
@@ -1,6 +1,9 @@
using BotSharp.Abstraction.Graph;
using BotSharp.Abstraction.Plugins.Models;
+using BotSharp.Abstraction.Rules;
using BotSharp.Plugin.Membase.GraphDb;
+using BotSharp.Plugin.Membase.Handlers;
+using BotSharp.Plugin.Membase.Interfaces;
using Refit;
namespace BotSharp.Plugin.Membase;
@@ -37,6 +40,10 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
_membaseCredential = config.GetValue("Membase:ApiKey") ?? string.Empty;
_membaseProjectId = config.GetValue("Membase:ProjectId") ?? string.Empty;
+
+#if DEBUG
+ services.AddScoped, DemoRuleGraph>();
+#endif
}
public bool AttachMenu(List menu)
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs b/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs
index 9cc80123f..c660f485d 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Models/Requests/EdgeUpdateModel.cs
@@ -2,5 +2,6 @@ namespace BotSharp.Plugin.Membase.Models;
public class EdgeUpdateModel
{
+ public string? Id { get; set; }
public object? Properties { get; set; }
}
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs b/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs
new file mode 100644
index 000000000..105260448
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.Membase/Services/DemoRuleGraph.cs
@@ -0,0 +1,322 @@
+using BotSharp.Abstraction.Graph;
+using BotSharp.Abstraction.Graph.Models;
+using BotSharp.Abstraction.Rules;
+using BotSharp.Abstraction.Rules.Models;
+using BotSharp.Abstraction.Rules.Options;
+using BotSharp.Abstraction.Utilities;
+using Microsoft.Extensions.Logging;
+using System.Text.Json;
+
+namespace BotSharp.Plugin.Membase.Services;
+
+public class DemoRuleGraph : IRuleFlow
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public DemoRuleGraph(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "One Flow";
+
+ public async Task GetTopologyConfigAsync(RuleFlowConfigOptions? options = null)
+ {
+ var settings = _services.GetRequiredService();
+ var apiKey = settings.ApiKey;
+ var projectId = settings.ProjectId;
+
+ var topologyName = Name;
+ if (!string.IsNullOrEmpty(options?.TopologyName))
+ {
+ topologyName = options.TopologyName;
+ }
+
+ var foundInstance = settings.GraphInstances?.FirstOrDefault(x => x.Name.IsEqualTo(topologyName));
+ var graphId = foundInstance?.Id ?? string.Empty;
+ var query = Uri.EscapeDataString("MATCH (a)-[r]->(b) WITH a, r, b WHERE a.agent = $agent AND a.trigger = $trigger AND b.agent = $agent AND b.trigger = $trigger RETURN a, r, b LIMIT 100");
+
+ return new RuleConfigModel
+ {
+ TopologyId = graphId,
+ TopologyName = foundInstance?.Name,
+ CustomParameters = JsonDocument.Parse(JsonSerializer.Serialize(new
+ {
+ htmlTag = "iframe",
+ appendParameterName = "parameters",
+ url = $"https://console.membase.dev/query-editor/{projectId}?graphId={graphId}&query={query}&token={apiKey}"
+ }))
+ };
+ }
+
+ public async Task GetTopologyAsync(string id, RuleFlowLoadOptions? options = null)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+#if DEBUG
+ return GetDefaultGraph();
+#else
+ return null;
+#endif
+ }
+
+ var query = options?.Query ?? string.Empty;
+ if (string.IsNullOrEmpty(query))
+ {
+ query = $"""
+ MATCH (a)-[r]->(b)
+ WITH a, r, b
+ WHERE a.agent = $agent AND a.trigger = $trigger AND b.agent = $agent AND b.trigger = $trigger
+ RETURN a, r, b
+ LIMIT 100
+ """;
+ }
+
+ var args = new Dictionary();
+ if (options?.Parameters != null)
+ {
+ foreach (var param in options.Parameters!)
+ {
+ if (param.Key == null || param.Value == null)
+ {
+ continue;
+ }
+ args[param.Key] = param.Value;
+ }
+ }
+
+ try
+ {
+ var graphDb = _services.GetServices().First(x => x.Provider.IsEqualTo("membase"));
+ var result = await graphDb.ExecuteQueryAsync(query, options: new()
+ {
+ GraphId = id,
+ Arguments = args
+ });
+
+ if (result == null)
+ {
+ return null;
+ }
+
+ var graph = BuildGraph(result);
+ return graph;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error when loading graph (id: {GraphId})", id);
+ return null;
+ }
+ }
+
+ private RuleGraph BuildGraph(GraphQueryResult result)
+ {
+ var graph = RuleGraph.Init();
+ if (result.Values.IsNullOrEmpty())
+ {
+ return graph;
+ }
+
+ foreach (var item in result.Values)
+ {
+ // Try to deserialize nodes and edge from the dictionary
+ if (!item.TryGetValue("a", out var sourceNodeElement) ||
+ !item.TryGetValue("b", out var targetNodeElement) ||
+ !item.TryGetValue("r", out var edgeElement))
+ {
+ continue;
+ }
+
+ // Parse source node
+ var sourceNodeId = sourceNodeElement.GetProperty("id").GetString();
+ var sourceNodeLabels = sourceNodeElement.TryGetProperty("labels", out var sLabels)
+ ? sLabels.EnumerateArray().Select(x => x.GetString() ?? "").ToList()
+ : [];
+ var sourceNodeProps = sourceNodeElement.TryGetProperty("properties", out var sProps)
+ ? sProps
+ : default;
+ var sourceNodeWeight = sourceNodeElement.TryGetProperty("weight", out var sNodeWeight) && sNodeWeight.ValueKind == JsonValueKind.Number
+ ? sNodeWeight.GetDouble()
+ : 1.0;
+
+ // Parse target node
+ var targetNodeId = targetNodeElement.GetProperty("id").GetString();
+ var targetNodeLabels = targetNodeElement.TryGetProperty("labels", out var tLabels)
+ ? tLabels.EnumerateArray().Select(x => x.GetString() ?? "").ToList()
+ : [];
+ var targetNodeProps = targetNodeElement.TryGetProperty("properties", out var tProps)
+ ? tProps
+ : default;
+ var targetNodeWeight = targetNodeElement.TryGetProperty("weight", out var tNodeWeight) && tNodeWeight.ValueKind == JsonValueKind.Number
+ ? tNodeWeight.GetDouble()
+ : 1.0;
+
+ // Parse edge
+ var edgeId = edgeElement.GetProperty("id").GetString();
+ var edgeProps = edgeElement.TryGetProperty("properties", out var eProps)
+ ? eProps
+ : default;
+ var edgeWeight = edgeElement.TryGetProperty("weight", out var eWeight) && eWeight.ValueKind == JsonValueKind.Number
+ ? eWeight.GetDouble()
+ : 1.0;
+
+ // Create source node
+ var sourceNode = new RuleNode()
+ {
+ Id = sourceNodeId ?? Guid.NewGuid().ToString(),
+ Labels = sourceNodeLabels,
+ Weight = sourceNodeWeight,
+ Name = GetGraphItemAttribute(sourceNodeProps, key: "name", defaultValue: "node"),
+ Type = GetGraphItemAttribute(sourceNodeProps, key: "type", defaultValue: "action"),
+ Purpose = GetGraphItemAttribute(sourceNodeProps, key: "purpose", defaultValue: "unknown"),
+ Config = GetConfig(sourceNodeProps)
+ };
+
+ // Create target node
+ var targetNode = new RuleNode()
+ {
+ Id = targetNodeId ?? Guid.NewGuid().ToString(),
+ Labels = targetNodeLabels,
+ Weight = targetNodeWeight,
+ Name = GetGraphItemAttribute(targetNodeProps, key: "name", defaultValue: "node"),
+ Type = GetGraphItemAttribute(targetNodeProps, key: "type", defaultValue: "action"),
+ Purpose = GetGraphItemAttribute(targetNodeProps, key: "purpose", defaultValue: "unknown"),
+ Config = GetConfig(targetNodeProps)
+ };
+
+ // Create edge payload
+ var edgePayload = new GraphItemPayload()
+ {
+ Id = edgeId ?? Guid.NewGuid().ToString(),
+ Name = GetGraphItemAttribute(edgeProps, key: "name", defaultValue: "edge"),
+ Type = GetGraphItemAttribute(edgeProps, key: "type", defaultValue: "next"),
+ Purpose = GetGraphItemAttribute(edgeProps, key: "purpose", defaultValue: "unknown"),
+ Weight = edgeWeight,
+ Config = GetConfig(edgeProps)
+ };
+
+ // Add edge to graph
+ graph.AddEdge(sourceNode, targetNode, edgePayload);
+ }
+
+ return graph;
+ }
+
+ private string GetGraphItemAttribute(JsonElement? properties, string key, string defaultValue)
+ {
+ if (properties == null || properties.Value.ValueKind == JsonValueKind.Undefined)
+ {
+ return defaultValue;
+ }
+
+ if (properties.Value.TryGetProperty(key, out var name) && name.ValueKind == JsonValueKind.String)
+ {
+ return name.GetString() ?? defaultValue;
+ }
+
+ return defaultValue;
+ }
+
+ private Dictionary GetConfig(JsonElement? properties)
+ {
+ var config = new Dictionary();
+
+ if (properties == null || properties.Value.ValueKind == JsonValueKind.Undefined)
+ {
+ return config;
+ }
+
+ // Convert all properties to config dictionary
+ foreach (var prop in properties.Value.EnumerateObject())
+ {
+ config[prop.Name] = prop.Value.ConvertToString();
+ }
+
+ return config;
+ }
+
+ private RuleGraph GetDefaultGraph()
+ {
+ var graph = RuleGraph.Init();
+ var root = new RuleNode
+ {
+ Name = "start",
+ Type = "start",
+ };
+
+ var end = new RuleNode
+ {
+ Name = "end",
+ Type = "end",
+ };
+
+ var node1 = new RuleNode
+ {
+ Name = "http_request",
+ Type = "action",
+ Config = new()
+ {
+ ["http_method"] = "GET",
+ ["http_url"] = "https://dummy.restapiexample.com/api/v1/employees"
+ }
+ };
+
+ var node2 = new RuleNode
+ {
+ Name = "http_request",
+ Type = "action",
+ Config = new()
+ {
+ ["http_method"] = "GET",
+ ["http_url"] = "https://dummy.restapiexample.com/api/v1/employee/1"
+ }
+ };
+
+ var node3 = new RuleNode
+ {
+ Name = "http_request",
+ Type = "action",
+ Config = new()
+ {
+ ["http_method"] = "GET",
+ ["http_url"] = "https://dummy.restapiexample.com/api/v1/employee/2"
+ }
+ };
+
+ graph.AddEdge(root, node1, payload: new()
+ {
+ Name = "edge",
+ Type = "next"
+ });
+
+ graph.AddEdge(node1, node2, payload: new()
+ {
+ Name = "edge",
+ Type = "next"
+ });
+
+ graph.AddEdge(node1, node3, payload: new()
+ {
+ Name = "edge",
+ Type = "next"
+ });
+
+ graph.AddEdge(node2, node3, payload: new()
+ {
+ Name = "edge",
+ Type = "next"
+ });
+
+ graph.AddEdge(node3, end, payload: new()
+ {
+ Name = "edge",
+ Type = "next"
+ });
+
+ return graph;
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs b/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs
index 380e9b703..d5f79cb1a 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Services/MembaseService.cs
@@ -1,4 +1,5 @@
using BotSharp.Abstraction.Graph.Models;
+using BotSharp.Plugin.Membase.Interfaces;
using BotSharp.Plugin.Membase.Models.Graph;
namespace BotSharp.Plugin.Membase.Services;
diff --git a/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs b/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs
index 77bf58351..c1370746d 100644
--- a/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs
+++ b/src/Plugins/BotSharp.Plugin.Membase/Settings/MembaseSettings.cs
@@ -5,4 +5,23 @@ public class MembaseSettings
public string Host { get; set; } = "localhost";
public string ProjectId { get; set; } = string.Empty;
public string ApiKey { get; set; } = string.Empty;
+ public GraphInstance[] GraphInstances { get; set; } = [];
}
+
+public class GraphInstance
+{
+ ///
+ /// Graph id
+ ///
+ public string Id { get; set; }
+
+ ///
+ /// Graph name
+ ///
+ public string Name { get; set; }
+
+ ///
+ /// Graph description
+ ///
+ public string Description { get; set; }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
index 4205fdc46..b481394a2 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
@@ -7,7 +7,7 @@ public class AgentRuleMongoElement
{
public string TriggerName { get; set; } = default!;
public bool Disabled { get; set; }
- public string Criteria { get; set; } = default!;
+ public RuleConfigMongoModel? Config { get; set; }
public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
@@ -15,7 +15,7 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ Config = RuleConfigMongoModel.ToMongoModel(rule.Config)
};
}
@@ -25,7 +25,39 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ Config = RuleConfigMongoModel.ToDomainModel(rule.Config)
+ };
+ }
+}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class RuleConfigMongoModel
+{
+ public string? TopologyName { get; set; }
+
+ public static RuleConfigMongoModel? ToMongoModel(RuleConfig? config)
+ {
+ if (config == null)
+ {
+ return null;
+ }
+
+ return new RuleConfigMongoModel
+ {
+ TopologyName = config.TopologyName
+ };
+ }
+
+ public static RuleConfig? ToDomainModel(RuleConfigMongoModel? config)
+ {
+ if (config == null)
+ {
+ return null;
+ }
+
+ return new RuleConfig
+ {
+ TopologyName = config.TopologyName
};
}
}
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
index 8a176a3fd..a003b3833 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
@@ -648,8 +648,7 @@ public async Task> TruncateConversation(string conversationId, stri
continue;
}
- var values = state.Values.Where(x => x.MessageId != messageId)
- .Where(x => x.UpdateTime < refTime)
+ var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime)
.ToList();
if (values.Count == 0) continue;
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
new file mode 100644
index 000000000..4a8f3ff20
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
@@ -0,0 +1,22 @@
+
+
+
+ $(TargetFramework)
+ enable
+ $(LangVersion)
+ $(BotSharpVersion)
+ $(GeneratePackageOnBuild)
+ $(GenerateDocumentationFile)
+ $(SolutionDir)packages
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
new file mode 100644
index 000000000..81c7de270
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
@@ -0,0 +1,73 @@
+using Microsoft.Extensions.ObjectPool;
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQChannelPool
+{
+ private readonly ObjectPool _pool;
+ private readonly ILogger _logger;
+ private readonly int _tryLimit = 3;
+
+ public RabbitMQChannelPool(
+ IServiceProvider services,
+ IRabbitMQConnection mqConnection)
+ {
+ _logger = services.GetRequiredService().CreateLogger();
+ var poolProvider = new DefaultObjectPoolProvider();
+ var policy = new ChannelPoolPolicy(mqConnection.Connection);
+ _pool = poolProvider.Create(policy);
+ }
+
+ public IChannel Get()
+ {
+ var count = 0;
+ var channel = _pool.Get();
+
+ while (count < _tryLimit && channel.IsClosed)
+ {
+ channel.Dispose();
+ channel = _pool.Get();
+ count++;
+ }
+
+ if (channel.IsClosed)
+ {
+ _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries.");
+ }
+
+ return channel;
+ }
+
+ public void Return(IChannel channel)
+ {
+ if (channel.IsOpen)
+ {
+ _pool.Return(channel);
+ }
+ else
+ {
+ channel.Dispose();
+ }
+ }
+}
+
+internal class ChannelPoolPolicy : IPooledObjectPolicy
+{
+ private readonly IConnection _connection;
+
+ public ChannelPoolPolicy(IConnection connection)
+ {
+ _connection = connection;
+ }
+
+ public IChannel Create()
+ {
+ return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public bool Return(IChannel obj)
+ {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
new file mode 100644
index 000000000..989c0a7b7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
@@ -0,0 +1,13 @@
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public static class RabbitMQChannelPoolFactory
+{
+ private static readonly ConcurrentDictionary _poolDict = new();
+
+ public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection)
+ {
+ return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection));
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
new file mode 100644
index 000000000..dac9e8c07
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
@@ -0,0 +1,154 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Threading;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQConnection : IRabbitMQConnection
+{
+ private readonly RabbitMQSettings _settings;
+ private readonly IConnectionFactory _connectionFactory;
+ private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1);
+ private readonly ILogger _logger;
+ private readonly int _retryCount = 5;
+
+ private IConnection _connection;
+ private bool _disposed = false;
+
+ public RabbitMQConnection(
+ RabbitMQSettings settings,
+ ILogger logger)
+ {
+ _settings = settings;
+ _logger = logger;
+ _connectionFactory = new ConnectionFactory
+ {
+ HostName = settings.HostName,
+ Port = settings.Port,
+ UserName = settings.UserName,
+ Password = settings.Password,
+ VirtualHost = settings.VirtualHost,
+ ConsumerDispatchConcurrency = 1,
+ AutomaticRecoveryEnabled = true,
+ HandshakeContinuationTimeout = TimeSpan.FromSeconds(20)
+ };
+ }
+
+ public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
+
+ public IConnection Connection => _connection;
+
+ public async Task CreateChannelAsync()
+ {
+ if (!IsConnected)
+ {
+ throw new InvalidOperationException("Rabbit MQ is not connectioned.");
+ }
+ return await _connection.CreateChannelAsync();
+ }
+
+ public async Task ConnectAsync()
+ {
+ await _lock.WaitAsync();
+
+ try
+ {
+ if (IsConnected)
+ {
+ return true;
+ }
+
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ _connection = await _connectionFactory.CreateConnectionAsync();
+ });
+
+ if (IsConnected)
+ {
+ _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
+ _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
+ _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
+ _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}");
+ return true;
+ }
+ _logger.LogError("Rabbit MQ client connection error.");
+ return false;
+ }
+ finally
+ {
+ _lock.Release();
+ }
+
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. {e}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}.");
+ return Task.CompletedTask;
+ }
+
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning("Start disposing Rabbit MQ connection.");
+
+ try
+ {
+ _connection.Dispose();
+ _disposed = true;
+ _logger.LogWarning("Disposed Rabbit MQ connection.");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when disposing Rabbit MQ connection");
+ }
+
+ GC.SuppressFinalize(this);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
new file mode 100644
index 000000000..36af0df90
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class DummyMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "dummy.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public DummyMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed dummy message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
new file mode 100644
index 000000000..f6040dcd7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
@@ -0,0 +1,25 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class ScheduledMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "scheduled.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public ScheduledMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed scheduled message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
new file mode 100644
index 000000000..802e4fa1b
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
@@ -0,0 +1,89 @@
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Plugin.RabbitMQ.Controllers;
+
+[Authorize]
+[ApiController]
+public class RabbitMQController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly IMQService _mqService;
+ private readonly ILogger _logger;
+
+ public RabbitMQController(
+ IServiceProvider services,
+ IMQService mqService,
+ ILogger logger)
+ {
+ _services = services;
+ _mqService = mqService;
+ _logger = logger;
+ }
+
+ ///
+ /// Publish a scheduled message to be delivered after a delay
+ ///
+ /// The scheduled message request
+ [HttpPost("/message-queue/publish")]
+ public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var payload = new ScheduledMessagePayload
+ {
+ Name = request.Name ?? "Hello"
+ };
+
+ var success = await _mqService.PublishAsync(
+ payload,
+ options: new()
+ {
+ TopicName = "my.exchange",
+ RoutingKey = "my.routing",
+ DelayMilliseconds = request.DelayMilliseconds ?? 10000,
+ MessageId = request.MessageId
+ });
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to publish scheduled message");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new PublishMessageResponse { Success = false, Error = ex.Message });
+ }
+ }
+
+ ///
+ /// Unsubscribe a consumer
+ ///
+ ///
+ ///
+ [HttpPost("/message-queue/unsubscribe/consumer")]
+ public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var success = await _mqService.UnsubscribeAsync(request.Name);
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new { Success = false, Error = ex.Message });
+ }
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
new file mode 100644
index 000000000..cb89c2976
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
@@ -0,0 +1,11 @@
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Interfaces;
+
+public interface IRabbitMQConnection : IDisposable
+{
+ bool IsConnected { get; }
+ IConnection Connection { get; }
+ Task CreateChannelAsync();
+ Task ConnectAsync();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
new file mode 100644
index 000000000..ad655b795
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
@@ -0,0 +1,31 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Request model for publishing a scheduled message
+///
+public class PublishScheduledMessageRequest
+{
+ public string? Name { get; set; }
+
+ public long? DelayMilliseconds { get; set; }
+
+ public string? MessageId { get; set; }
+}
+
+
+///
+/// Response model for publish operations
+///
+public class PublishMessageResponse
+{
+ ///
+ /// Whether the message was successfully published
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Error message if publish failed
+ ///
+ public string? Error { get; set; }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
new file mode 100644
index 000000000..93754d455
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+internal class RabbitMQConsumerConfig
+{
+ ///
+ /// The exchange name (topic in some MQ systems).
+ ///
+ internal string ExchangeName { get; set; } = "rabbitmq.exchange";
+
+ ///
+ /// The queue name (subscription in some MQ systems).
+ ///
+ internal string QueueName { get; set; } = "rabbitmq.queue";
+
+ ///
+ /// The routing key (filter in some MQ systems).
+ ///
+ internal string RoutingKey { get; set; } = "rabbitmq.routing";
+
+ ///
+ /// Additional arguments for the consumer configuration.
+ ///
+ internal Dictionary Arguments { get; set; } = new();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
new file mode 100644
index 000000000..2180fb2d7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
@@ -0,0 +1,9 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Payload for scheduled/delayed messages
+///
+public class ScheduledMessagePayload
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
new file mode 100644
index 000000000..509d432b2
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
@@ -0,0 +1,6 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+public class UnsubscribeConsumerRequest
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
new file mode 100644
index 000000000..ff45dfe48
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
@@ -0,0 +1,56 @@
+using BotSharp.Plugin.RabbitMQ.Services;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Plugin.RabbitMQ;
+
+public class RabbitMQPlugin : IBotSharpAppPlugin
+{
+ public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514";
+ public string Name => "RabbitMQ";
+ public string Description => "Handle AI messages in RabbitMQ.";
+ public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var settings = new RabbitMQSettings();
+ config.Bind("RabbitMQ", settings);
+ services.AddSingleton(settings);
+
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+
+ public void Configure(IApplicationBuilder app)
+ {
+#if DEBUG
+ var sp = app.ApplicationServices;
+ var mqSettings = sp.GetRequiredService();
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ var mqService = sp.GetRequiredService();
+ var loggerFactory = sp.GetRequiredService();
+
+ // Create and subscribe the consumer using the abstract interface
+ var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+
+ var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+ }
+#endif
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
new file mode 100644
index 000000000..0117bad14
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
@@ -0,0 +1,318 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Services;
+
+public class RabbitMQService : IMQService
+{
+ private readonly IRabbitMQConnection _mqConnection;
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ private readonly int _retryCount = 5;
+ private bool _disposed = false;
+ private static readonly ConcurrentDictionary _consumers = [];
+
+ public RabbitMQService(
+ IRabbitMQConnection mqConnection,
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _mqConnection = mqConnection;
+ _services = services;
+ _logger = logger;
+ }
+
+ public async Task SubscribeAsync(string key, IMQConsumer consumer)
+ {
+ if (_consumers.ContainsKey(key))
+ {
+ _logger.LogWarning($"Consumer with key '{key}' is already subscribed.");
+ return false;
+ }
+
+ var registration = await CreateConsumerRegistrationAsync(consumer);
+ if (registration != null && _consumers.TryAdd(key, registration))
+ {
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'.");
+ return true;
+ }
+
+ return false;
+ }
+
+ public async Task UnsubscribeAsync(string key)
+ {
+ if (!_consumers.TryRemove(key, out var registration))
+ {
+ return false;
+ }
+
+ try
+ {
+ if (registration.Channel != null)
+ {
+ registration.Channel.Dispose();
+ }
+ registration.Consumer.Dispose();
+ _logger.LogInformation($"Consumer '{key}' unsubscribed.");
+ return true;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error unsubscribing consumer '{key}'.");
+ return false;
+ }
+ }
+
+ private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer)
+ {
+ try
+ {
+ var channel = await CreateChannelAsync(consumer);
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var registration = new ConsumerRegistration(consumer, channel);
+
+ var asyncConsumer = new AsyncEventingBasicConsumer(channel);
+ asyncConsumer.ReceivedAsync += async (sender, eventArgs) =>
+ {
+ await ConsumeEventAsync(registration, eventArgs);
+ };
+
+ await channel.BasicConsumeAsync(
+ queue: config.QueueName,
+ autoAck: false,
+ consumer: asyncConsumer);
+
+ _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'.");
+ return registration;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when register consumer in RabbitMQ.");
+ return null;
+ }
+ }
+
+ private async Task CreateChannelAsync(IMQConsumer consumer)
+ {
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var channel = await _mqConnection.CreateChannelAsync();
+ _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'");
+
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (config.Arguments != null)
+ {
+ foreach (var kvp in config.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: config.ExchangeName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ await channel.QueueDeclareAsync(
+ queue: config.QueueName,
+ durable: true,
+ exclusive: false,
+ autoDelete: false);
+
+ await channel.QueueBindAsync(
+ queue: config.QueueName,
+ exchange: config.ExchangeName,
+ routingKey: config.RoutingKey);
+
+ return channel;
+ }
+
+ private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs)
+ {
+ var data = string.Empty;
+ var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new();
+
+ try
+ {
+ data = Encoding.UTF8.GetString(eventArgs.Body.Span);
+ _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}");
+
+ var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data);
+ if (registration.Channel?.IsOpen == true)
+ {
+ if (isHandled)
+ {
+ await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false);
+ }
+ else
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}");
+ if (registration.Channel?.IsOpen == true)
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+
+ public async Task PublishAsync(T payload, MQPublishOptions options)
+ {
+ try
+ {
+ if (options == null)
+ {
+ return false;
+ }
+
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var isPublished = false;
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection);
+ var channel = channelPool.Get();
+
+ try
+ {
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (!options.Arguments.IsNullOrEmpty())
+ {
+ foreach (var kvp in options.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: options.TopicName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ var messageId = options.MessageId ?? Guid.NewGuid().ToString();
+ var message = new MQMessage(payload, messageId);
+ var body = ConvertToBinary(message, options.JsonOptions);
+ var properties = new BasicProperties
+ {
+ MessageId = messageId,
+ DeliveryMode = DeliveryModes.Persistent,
+ Headers = new Dictionary
+ {
+ ["x-delay"] = options.DelayMilliseconds
+ }
+ };
+
+ await channel.BasicPublishAsync(
+ exchange: options.TopicName,
+ routingKey: options.RoutingKey,
+ mandatory: true,
+ basicProperties: properties,
+ body: body);
+
+ isPublished = true;
+ }
+ catch (Exception)
+ {
+ throw;
+ }
+ finally
+ {
+ channelPool.Return(channel);
+ }
+ });
+
+ return isPublished;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when RabbitMQ publish message.");
+ return false;
+ }
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null)
+ {
+ var jsonStr = JsonSerializer.Serialize(data, jsonOptions);
+ var body = Encoding.UTF8.GetBytes(jsonStr);
+ return body;
+ }
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning($"Disposing {nameof(RabbitMQService)}");
+
+ foreach (var item in _consumers)
+ {
+ if (item.Value.Channel != null)
+ {
+ item.Value.Channel.Dispose();
+ }
+ item.Value.Consumer.Dispose();
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+
+ ///
+ /// Internal class to track consumer registrations with their RabbitMQ channels.
+ ///
+ private class ConsumerRegistration
+ {
+ public IMQConsumer Consumer { get; }
+ public IChannel? Channel { get; }
+
+ public ConsumerRegistration(IMQConsumer consumer, IChannel? channel)
+ {
+ Consumer = consumer;
+ Channel = channel;
+ }
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
new file mode 100644
index 000000000..0e61b5c71
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
@@ -0,0 +1,10 @@
+namespace BotSharp.Plugin.RabbitMQ.Settings;
+
+public class RabbitMQSettings
+{
+ public string HostName { get; set; } = "localhost";
+ public int Port { get; set; } = 5672;
+ public string UserName { get; set; } = "guest";
+ public string Password { get; set; } = "guest";
+ public string VirtualHost { get; set; } = "/";
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
new file mode 100644
index 000000000..0a8a8c3a5
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
@@ -0,0 +1,38 @@
+global using System;
+global using System.Collections.Generic;
+global using System.Text;
+global using System.Linq;
+global using System.Text.Json;
+global using System.Net.Mime;
+global using System.Threading.Tasks;
+global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using BotSharp.Abstraction.Agents;
+global using BotSharp.Abstraction.Conversations;
+global using BotSharp.Abstraction.Plugins;
+global using BotSharp.Abstraction.Conversations.Models;
+global using BotSharp.Abstraction.Functions;
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Agents.Enums;
+global using BotSharp.Abstraction.Files.Enums;
+global using BotSharp.Abstraction.Files.Models;
+global using BotSharp.Abstraction.Files.Converters;
+global using BotSharp.Abstraction.Files;
+global using BotSharp.Abstraction.MLTasks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Agents.Settings;
+global using BotSharp.Abstraction.Functions.Models;
+global using BotSharp.Abstraction.Repositories;
+global using BotSharp.Abstraction.Settings;
+global using BotSharp.Abstraction.Messaging;
+global using BotSharp.Abstraction.Messaging.Models.RichContent;
+global using BotSharp.Abstraction.Options;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+global using BotSharp.Plugin.RabbitMQ.Settings;
+global using BotSharp.Plugin.RabbitMQ.Models;
+global using BotSharp.Plugin.RabbitMQ.Interfaces;
+global using BotSharp.Plugin.RabbitMQ.Consumers;
+global using BotSharp.Plugin.RabbitMQ.Connections;
\ No newline at end of file
diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json
index e4ad7bf57..fb7367dd2 100644
--- a/src/WebStarter/appsettings.json
+++ b/src/WebStarter/appsettings.json
@@ -1006,6 +1006,7 @@
"Language": "en"
}
},
+
"A2AIntegration": {
"Enabled": true,
"DefaultTimeoutSeconds": 30,
@@ -1018,12 +1019,27 @@
}
]
},
+
+ "MessageQueue": {
+ "Enabled": false,
+ "Provider": "RabbitMQ"
+ },
+
+ "RabbitMQ": {
+ "HostName": "localhost",
+ "Port": 5672,
+ "UserName": "guest",
+ "Password": "guest",
+ "VirtualHost": "/"
+ },
+
"PluginLoader": {
"Assemblies": [
"BotSharp.Core",
"BotSharp.Core.A2A",
"BotSharp.Core.SideCar",
"BotSharp.Core.Crontab",
+ "BotSharp.Core.Rules",
"BotSharp.Core.Realtime",
"BotSharp.Logger",
"BotSharp.Plugin.MongoStorage",