ETJava Beta | Java    注册   登录
  • 搜索:
  • Elsa V3学习之Flowchart详解(上)

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/fanshaoO/p/18368210
    如有侵权 请联系我们删除  (页面底部联系我们)  

    前面我们通过界面学习了Elsa的一些基本使用,若是有实操的小伙伴们,应该可以发现,我们工作流定义中的root,既我们的工作流画布其实也是一个activity,就是Flowchart。那么本文将来解读以下flowchart的执行逻辑。

    Flowchart源码

    为了方便大家,这里先直接把flowchart的源码贴出。

    using System.ComponentModel;
    using System.Runtime.CompilerServices;
    using Elsa.Extensions;
    using Elsa.Workflows.Activities.Flowchart.Contracts;
    using Elsa.Workflows.Activities.Flowchart.Extensions;
    using Elsa.Workflows.Activities.Flowchart.Models;
    using Elsa.Workflows.Attributes;
    using Elsa.Workflows.Contracts;
    using Elsa.Workflows.Options;
    using Elsa.Workflows.Signals;
    using Microsoft.Extensions.Logging;
    
    namespace Elsa.Workflows.Activities.Flowchart.Activities;
    
    /// <summary>
    /// A flowchart consists of a collection of activities and connections between them.
    /// </summary>
    [Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
    [Browsable(false)]
    public class Flowchart : Container
    {
        internal const string ScopeProperty = "Scope";
    
        /// <inheritdoc />
        public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
        {
            OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
            OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
            OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
        }
    
        /// <summary>
        /// The activity to execute when the flowchart starts.
        /// </summary>
        [Port]
        [Browsable(false)]
        public IActivity? Start { get; set; }
    
        /// <summary>
        /// A list of connections between activities.
        /// </summary>
        public ICollection<Connection> Connections { get; set; } = new List<Connection>();
    
        /// <inheritdoc />
        protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
        {
            var startActivity = GetStartActivity(context);
    
            if (startActivity == null)
            {
                // Nothing else to execute.
                await context.CompleteActivityAsync();
                return;
            }
    
            // Schedule the start activity.
            await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
        }
    
        private IActivity? GetStartActivity(ActivityExecutionContext context)
        {
            // If there's a trigger that triggered this workflow, use that.
            var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
            var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;
    
            if (triggerActivity != null)
                return triggerActivity;
    
            // If an explicit Start activity was provided, use that.
            if (Start != null)
                return Start;
    
            // If there is a Start activity on the flowchart, use that.
            var startActivity = Activities.FirstOrDefault(x => x is Start);
    
            if (startActivity != null)
                return startActivity;
    
            // If there's an activity marked as "Can Start Workflow", use that.
            var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
    
            if (canStartWorkflowActivity != null)
                return canStartWorkflowActivity;
    
            // If there is a single activity that has no inbound connections, use that.
            var root = GetRootActivity();
    
            if (root != null)
                return root;
    
            // If no start activity found, return the first activity.
            return Activities.FirstOrDefault();
        }
    
        /// <summary>
        /// Checks if there is any pending work for the flowchart.
        /// </summary>
        private bool HasPendingWork(ActivityExecutionContext context)
        {
            var workflowExecutionContext = context.WorkflowExecutionContext;
            var activityIds = Activities.Select(x => x.Id).ToList();
            var descendantContexts = context.GetDescendents().Where(x => x.ParentActivityExecutionContext == context).ToList();
            var activityExecutionContexts = descendantContexts.Where(x => activityIds.Contains(x.Activity.Id)).ToList();
    
            var hasPendingWork = workflowExecutionContext.Scheduler.List().Any(workItem =>
            {
                var ownerInstanceId = workItem.Owner?.Id;
    
                if (ownerInstanceId == null)
                    return false;
    
                if (ownerInstanceId == context.Id)
                    return true;
    
                var ownerContext = context.WorkflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
                var ancestors = ownerContext.GetAncestors().ToList();
    
                return ancestors.Any(x => x == context);
            });
    
            var hasRunningActivityInstances = activityExecutionContexts.Any(x => x.Status == ActivityStatus.Running);
    
            return hasRunningActivityInstances || hasPendingWork;
        }
    
        private IActivity? GetRootActivity()
        {
            // Get the first activity that has no inbound connections.
            var query =
                from activity in Activities
                let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
                where !inboundConnections
                select activity;
    
            var rootActivity = query.FirstOrDefault();
            return rootActivity;
        }
    
        private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
        {
            var logger = context.GetRequiredService<ILogger<Flowchart>>();
            var flowchartContext = context.TargetContext;
            var completedActivityContext = context.ChildContext;
            var completedActivity = completedActivityContext.Activity;
            var result = context.Result;
    
            // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
            var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
            var outcomeNames = result is Outcomes outcomes
                ? outcomes.Names
                : [null!, "Done"];
    
            // Only query the outbound connections if the completed activity wasn't already completed.
            var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
            var children = outboundConnections.Select(x => x.Target.Activity).ToList();
            var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());
    
            scope.RegisterActivityExecution(completedActivity);
    
            // If the complete activity is a terminal node, complete the flowchart immediately.
            if (completedActivity is ITerminalNode)
            {
                await flowchartContext.CompleteActivityAsync();
            }
            else if (scheduleChildren)
            {
                if (children.Any())
                {
                    // Schedule each child, but only if all of its left inbound activities have already executed.
                    foreach (var activity in children)
                    {
                        var existingActivity = scope.ContainsActivity(activity);
                        scope.AddActivity(activity);
    
                        var inboundActivities = Connections.LeftInboundActivities(activity).ToList();
    
                        // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                        if (!inboundActivities.Contains(completedActivity))
                        {
                            await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                            continue;
                        }
    
                        // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                        if (activity is not IJoinNode)
                        {
                            var executionCount = scope.GetExecutionCount(activity);
                            var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);
    
                            if (haveInboundActivitiesExecuted) 
                                await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                        }
                        else
                        {
                            // Select an existing activity execution context for this activity, if any.
                            var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
                                x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
                            var scheduleWorkOptions = new ScheduleWorkOptions
                            {
                                CompletionCallback = OnChildCompletedAsync,
                                ExistingActivityExecutionContext = joinContext,
                                PreventDuplicateScheduling = true
                            };
    
                            if (joinContext != null)
                                logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
                            else if (!existingActivity)
                                logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
                            else
                            {
                                logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
                                continue;
                            }
    
                            await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
                        }
                    }
                }
    
                if (!children.Any())
                {
                    await CompleteIfNoPendingWorkAsync(flowchartContext);
                }
            }
    
            flowchartContext.SetProperty(ScopeProperty, scope);
        }
    
        private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
        {
            var hasPendingWork = HasPendingWork(context);
    
            if (!hasPendingWork)
            {
                var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);
    
                if (!hasFaultedActivities)
                {
                    await context.CompleteActivityAsync();
                }
            }
        }
    
        private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
        {
            var flowchartContext = context.ReceiverActivityExecutionContext;
            var schedulingActivityContext = context.SenderActivityExecutionContext;
            var schedulingActivity = schedulingActivityContext.Activity;
            var outcomes = signal.Outcomes;
            var outboundConnections = Connections.Where(connection => connection.Source.Activity == schedulingActivity && outcomes.Contains(connection.Source.Port!)).ToList();
            var outboundActivities = outboundConnections.Select(x => x.Target.Activity).ToList();
    
            if (outboundActivities.Any())
            {
                // Schedule each child.
                foreach (var activity in outboundActivities) await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
            }
        }
    
        private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
        {
            var flowchartContext = context.ReceiverActivityExecutionContext;
            var activity = signal.Activity;
            var activityExecutionContext = signal.ActivityExecutionContext;
    
            if (activityExecutionContext != null)
            {
                await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new ScheduleWorkOptions
                {
                    ExistingActivityExecutionContext = activityExecutionContext,
                    CompletionCallback = OnChildCompletedAsync,
                    Input = signal.Input
                });
            }
            else
            {
                await flowchartContext.ScheduleActivityAsync(activity, new ScheduleWorkOptions
                {
                    CompletionCallback = OnChildCompletedAsync,
                    Input = signal.Input
                });
            }
        }
    
        private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
        {
            await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
        }
    }
    

    首先我们从Activity特性中的描述参数中可以看到介绍flowchart作用的一句话:A flowchart is a collection of activities and connections between them.显而易见,flowchart是一个存储了多个Activity和他们连接关系的集合。有了这些数据,flowchart就可以根据connections中的连接关系对activity按照顺序执行了。

    Container

    接下来我们再往下看,可以看到flowchart不是直接继承Activity的基类,而是继承Container。
    Container包含了Activities和Variables两个集合属性,分别用于存储我们的节点集合和变量集合。
    在Container的执行入口方法中,先对变量进行了初始化和注册。

    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
    {
        // Ensure variables have names.
        EnsureNames(Variables);
    
        // Register variables.
        context.ExpressionExecutionContext.Memory.Declare(Variables);
    
        // Schedule children.
        await ScheduleChildrenAsync(context);
    }
    

    在最后调用了一个ScheduleChildrenAsync方法。这里可以看到这个方法是一个虚方法,可以给子类重写。

    protected virtual ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
    {
        ScheduleChildren(context);
        return ValueTask.CompletedTask;
    }
    

    在flowchart中,执行的入口正是这个重写的ScheduleChildrenAsync方法。

    Flowchart执行逻辑

    回归正题,接下来我们继续看Flowchart的入口,既ScheduleChildrenAsync方法。

    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
    {
        var startActivity = GetStartActivity(context);
    
        if (startActivity == null)
        {
            // Nothing else to execute.
            await context.CompleteActivityAsync();
            return;
        }
    
        // Schedule the start activity.
        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
    }
    

    先简单过一下这几行的逻辑,首先获取StartActivity,既获取第一个执行的工作流节点,如果获取不到,这结束工作流。
    如果获取到了,那么将发起调度,同时传入一个回调函数,这个回调函数是工作流按照顺序执行的关键。

    GetStartActivity

    那么接下来看它是如何拿到起始节点的呢。

    private IActivity? GetStartActivity(ActivityExecutionContext context)
    {
        // If there's a trigger that triggered this workflow, use that.
        var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
        var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;
    
        if (triggerActivity != null)
            return triggerActivity;
    
        // If an explicit Start activity was provided, use that.
        if (Start != null)
            return Start;
    
        // If there is a Start activity on the flowchart, use that.
        var startActivity = Activities.FirstOrDefault(x => x is Start);
    
        if (startActivity != null)
            return startActivity;
    
        // If there's an activity marked as "Can Start Workflow", use that.
        var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
    
        if (canStartWorkflowActivity != null)
            return canStartWorkflowActivity;
    
        // If there is a single activity that has no inbound connections, use that.
        var root = GetRootActivity();
    
        if (root != null)
            return root;
    
        // If no start activity found, return the first activity.
        return Activities.FirstOrDefault();
    }
    

    这里从开头可以看到,优先级最高的StartActivity竟然不是Star,而是先获取TriggerActivity,那么什么是TriggerActivity呢,就比如我们的HTTP Endpoint, Event, Cron这些,当我们拖到画布当中时,默认会勾选Trigger workflow这个选项,如下图中间最下方所示。至于他的触发原理后续再深入探讨,这里就稍微过一下就好了。
    image.png
    若是没有TriggerActivity,那么flowchart会判断Start属性是否存在,如果存在表示明确指定了Start节点,那这个节点将作为工作流的起始节点。
    若是Start也不存在,则会从所有的Activities中查找第一个Start节点,若存在,则作为工作流起始节点。
    若在Activities中也没有Start节点,则再判断一下是否有节点勾选了Start Of Workflow选项,若是勾选了,则获取第一个勾选的Activity作为起始节点。
    image.png
    若是再没有符合条件的节点,则会尝试获取root节点。

     private IActivity? GetRootActivity()
     {
         // Get the first activity that has no inbound connections.
         var query =
             from activity in Activities
             let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
             where !inboundConnections
             select activity;
    
         var rootActivity = query.FirstOrDefault();
         return rootActivity;
     }
    

    通过代码我们可以看到,root节点就是Connections连线关系中的第一个节点。
    若是一堆节点里面没有任何连线关系,那么最后则会在所有的Activity中取第一个当作入口。

    可以看到,获取我们的StartActivity的逻辑还是挺严谨的。

    context.ScheduleActivityAsync

    好了,获取到了StartActivity之后,接下来就是真正的发起调度了,context.ScheduleActivityAsync方法就是把我们的StartActivity塞进去调度队列,然后会自动执行节点。这执行的逻辑在后面的文章再解析。这个方法关键的是后面那个Callback方法。既OnChildCompletedAsync。

    由于OnChildCompletedAsync的逻辑比较复杂,我们放到下一篇文章再继续讲解。