Class Pipeline
Represents a graph of components and controls scheduling and message passing.
Namespace: Microsoft.Psi
Assembly: Microsoft.Psi.dll
Syntax
public class Pipeline : IDisposable
Constructors
View SourcePipeline(String, DeliveryPolicy, Int32, Boolean, Boolean, DiagnosticsConfiguration)
Initializes a new instance of the Pipeline class.
Declaration
public Pipeline(string name, DeliveryPolicy defaultDeliveryPolicy, int threadCount, bool allowSchedulingOnExternalThreads, bool enableDiagnostics = false, DiagnosticsConfiguration diagnosticsConfiguration = null)
Parameters
Type | Name | Description |
---|---|---|
System. |
name | Pipeline name. |
Delivery |
defaultDeliveryPolicy | Pipeline-level default delivery policy (defaults to Unlimited if unspecified). |
System. |
threadCount | Number of threads. |
System. |
allowSchedulingOnExternalThreads | Whether to allow scheduling on external threads. |
System. |
enableDiagnostics | Whether to enable collecting and publishing diagnostics information on the Pipeline.Diagnostics stream. |
Diagnostics |
diagnosticsConfiguration | Optional diagnostics configuration information. |
Properties
View SourceDiagnostics
Gets emitter producing diagnostics information (must be enabled when running pipeline).
Declaration
public Emitter<PipelineDiagnostics> Diagnostics { get; }
Property Value
Type | Description |
---|---|
Emitter<Pipeline |
Id
Gets pipeline ID.
Declaration
public int Id { get; }
Property Value
Type | Description |
---|---|
System. |
LatestFiniteSourceCompletionTime
Gets or sets the completion time of the latest completed finite source component.
Declaration
protected DateTime? LatestFiniteSourceCompletionTime { get; set; }
Property Value
Type | Description |
---|---|
System. |
Name
Gets pipeline name.
Declaration
public string Name { get; }
Property Value
Type | Description |
---|---|
System. |
NoRemainingCompletableComponents
Gets an
Declaration
protected AutoResetEvent NoRemainingCompletableComponents { get; }
Property Value
Type | Description |
---|---|
Auto |
Remarks
This is an
ProgressReportInterval
Gets or sets the progress reporting time interval.
Declaration
public TimeSpan ProgressReportInterval { get; set; }
Property Value
Type | Description |
---|---|
Time |
ReplayDescriptor
Gets replay descriptor.
Declaration
public ReplayDescriptor ReplayDescriptor { get; }
Property Value
Type | Description |
---|---|
Replay |
StartTime
Gets the pipeline start time based on the pipeline clock.
Declaration
public DateTime StartTime { get; }
Property Value
Type | Description |
---|---|
Date |
Methods
View SourceConvertFromRealTime(DateTime)
Convert real datetime to virtual.
Declaration
public DateTime ConvertFromRealTime(DateTime time)
Parameters
Type | Name | Description |
---|---|---|
Date |
time | Datetime to convert. |
Returns
Type | Description |
---|---|
Date |
Converted datetime. |
ConvertFromRealTime(TimeSpan)
Convert real timespan to virtual.
Declaration
public TimeSpan ConvertFromRealTime(TimeSpan duration)
Parameters
Type | Name | Description |
---|---|---|
Time |
duration | Duration to convert. |
Returns
Type | Description |
---|---|
Time |
Converted time span. |
ConvertToRealTime(DateTime)
Convert virtual datetime to real time.
Declaration
public DateTime ConvertToRealTime(DateTime time)
Parameters
Type | Name | Description |
---|---|---|
Date |
time | Datetime to convert. |
Returns
Type | Description |
---|---|
Date |
Converted datetime. |
ConvertToRealTime(TimeSpan)
Convert virtual duration to real time.
Declaration
public TimeSpan ConvertToRealTime(TimeSpan duration)
Parameters
Type | Name | Description |
---|---|---|
Time |
duration | Duration to convert. |
Returns
Type | Description |
---|---|
Time |
Converted time span. |
Create(String, DeliveryPolicy, Int32, Boolean, Boolean, DiagnosticsConfiguration)
Create pipeline.
Declaration
public static Pipeline Create(string name = null, DeliveryPolicy deliveryPolicy = null, int threadCount = 0, bool allowSchedulingOnExternalThreads = false, bool enableDiagnostics = false, DiagnosticsConfiguration diagnosticsConfiguration = null)
Parameters
Type | Name | Description |
---|---|---|
System. |
name | Pipeline name. |
Delivery |
deliveryPolicy | Pipeline-level delivery policy. |
System. |
threadCount | Number of threads. |
System. |
allowSchedulingOnExternalThreads | Whether to allow scheduling on external threads. |
System. |
enableDiagnostics | Indicates whether to enable collecting and publishing diagnostics information on the Pipeline.Diagnostics stream. |
Diagnostics |
diagnosticsConfiguration | Optional diagnostics configuration information. |
Returns
Type | Description |
---|---|
Pipeline | Created pipeline. |
CreateAsyncReceiver<T>(Object, Func<T, Envelope, Task>, String)
Creates an input receiver associated with the specified component object, connected to an async message processing function. The expected signature of the message processing delegate is:
async void Receive(T
message, Envelope env);
Declaration
public Receiver<T> CreateAsyncReceiver<T>(object owner, Func<T, Envelope, Task> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Func<T, Envelope, Task> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
CreateAsyncReceiver<T>(Object, Func<T, Task>, String)
Creates an input receiver associated with the specified component object, connected to an async message processing function. The expected signature of the message processing delegate is:
async void Receive(T
message);
Declaration
public Receiver<T> CreateAsyncReceiver<T>(object owner, Func<T, Task> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Func<T, Task> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
CreateAsyncReceiver<T>(Object, Func<Message<T>, Task>, String)
Creates an input receiver associated with the specified component object, connected to an async message processing function. The expected signature of the message processing delegate is:
async void Receive(Message{T
} message);
Declaration
public Receiver<T> CreateAsyncReceiver<T>(object owner, Func<Message<T>, Task> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Func<Message<T>, Task> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
CreateEmitter<T>(Object, String, Emitter<T>.ValidateMessageHandler)
Create emitter.
Declaration
public Emitter<T> CreateEmitter<T>(object owner, string name, Emitter<T>.ValidateMessageHandler messageValidator = null)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | Owner of emitter. |
System. |
name | Name of emitter. |
Emitter. |
messageValidator | An optional message validator. |
Returns
Type | Description |
---|---|
Emitter<T> | Created emitter. |
Type Parameters
Name | Description |
---|---|
T | Type of emitted messages. |
CreateReceiver<T>(Object, Action<T, Envelope>, String)
Creates an input receiver associated with the specified component object.
Declaration
public Receiver<T> CreateReceiver<T>(object owner, Action<T, Envelope> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Action<T, Envelope> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
CreateReceiver<T>(Object, Action<T>, String)
Creates an input receiver associated with the specified component object.
Declaration
public Receiver<T> CreateReceiver<T>(object owner, Action<T> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Action<T> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
CreateReceiver<T>(Object, Action<Message<T>>, String)
Creates an input receiver associated with the specified component object.
Declaration
public Receiver<T> CreateReceiver<T>(object owner, Action<Message<T>> action, string name)
Parameters
Type | Name | Description |
---|---|---|
System. |
owner | The component that owns the receiver. This is usually the state object that the receiver operates on. The receivers associated with the same owner are never executed concurrently. |
Action<Message<T>> | action | The action to execute when a message is delivered to this receiver. |
System. |
name | The debug name of the receiver. |
Returns
Type | Description |
---|---|
Receiver<T> | A new receiver. |
Type Parameters
Name | Description |
---|---|
T | The type of messages accepted by this receiver. |
Dispose()
Declaration
public virtual void Dispose()
GetCurrentTime()
Get current clock time.
Declaration
public DateTime GetCurrentTime()
Returns
Type | Description |
---|---|
Date |
Current clock time. |
GetCurrentTimeFromElapsedTicks(Int64)
Get current time, given elapsed ticks.
Declaration
public DateTime GetCurrentTimeFromElapsedTicks(long ticksFromSystemBoot)
Parameters
Type | Name | Description |
---|---|---|
System. |
ticksFromSystemBoot | Ticks elapsed since system boot. |
Returns
Type | Description |
---|---|
Date |
Current time. |
GetDefaultDeliveryPolicy<T>()
Gets the default delivery policy for a stream of given type.
Declaration
public virtual DeliveryPolicy<T> GetDefaultDeliveryPolicy<T>()
Returns
Type | Description |
---|---|
Delivery |
The default delivery policy to use for that stream. |
Type Parameters
Name | Description |
---|---|
T | The type of the stream. |
Remarks
The default delivery policy is used when no delivery policy is specified when wiring the stream.
GetDefaultMessageValidator<T>()
Gets the default message validator for a stream of a given type.
Declaration
public virtual Emitter<T>.ValidateMessageHandler GetDefaultMessageValidator<T>()
Returns
Type | Description |
---|---|
Emitter. |
The default validator to use for that stream. |
Type Parameters
Name | Description |
---|---|
T | The type of the stream. |
ProposeReplayTime(TimeInterval)
Propose replay time.
Declaration
public virtual void ProposeReplayTime(TimeInterval originatingTimeInterval)
Parameters
Type | Name | Description |
---|---|---|
Time |
originatingTimeInterval | Originating time interval. |
Run(DateTime, DateTime, Boolean, IProgress<Double>)
Runs the pipeline synchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public void Run(DateTime replayStartTime, DateTime replayEndTime, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Date |
replayStartTime | The time at which to start replaying. |
Date |
replayEndTime | The time at which to end replaying. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Run(DateTime, Boolean, IProgress<Double>)
Runs the pipeline synchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public void Run(DateTime replayStartTime, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Date |
replayStartTime | Time at which to start replaying. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Run(ReplayDescriptor, IProgress<Double>)
Runs the pipeline synchronously.
Declaration
public void Run(ReplayDescriptor descriptor = null, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Replay |
descriptor | An optional replay descriptor to apply when replaying data from a store. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Run(TimeInterval, Boolean, IProgress<Double>)
Runs the pipeline synchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public void Run(TimeInterval replayInterval, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Time |
replayInterval | The time interval within which to replay the data. The pipeline will commence playback at the start time of this interval, and only messages bearing an originating time within this interval will be retrieved from the store(s) contained in the pipeline and delivered. Pipeline execution will stop once all messages within this interval have been processed. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
RunAsync(DateTime, DateTime, Boolean, IProgress<Double>)
Runs the pipeline asynchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public IDisposable RunAsync(DateTime replayStartTime, DateTime replayEndTime, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Date |
replayStartTime | Time at which to start replaying. |
Date |
replayEndTime | Time at which to end replaying. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Returns
Type | Description |
---|---|
IDisposable | An IDisposable instance which may be used to terminate the pipeline. |
RunAsync(DateTime, Boolean, IProgress<Double>)
Runs the pipeline asynchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public IDisposable RunAsync(DateTime replayStartTime, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Date |
replayStartTime | Time at which to start replaying. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Returns
Type | Description |
---|---|
IDisposable | An IDisposable instance which may be used to terminate the pipeline. |
RunAsync(ReplayDescriptor, IProgress<Double>)
Runs the pipeline asynchronously.
Declaration
public IDisposable RunAsync(ReplayDescriptor descriptor = null, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Replay |
descriptor | An optional replay descriptor to apply when replaying data from a store. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Returns
Type | Description |
---|---|
IDisposable | An IDisposable instance which may be used to terminate the pipeline. |
RunAsync(ReplayDescriptor, Clock, IProgress<Double>)
Run pipeline (asynchronously).
Declaration
protected virtual IDisposable RunAsync(ReplayDescriptor descriptor, Clock clock, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Replay |
descriptor | Replay descriptor. |
Clock | clock | Clock to use (in the case of a shared scheduler - e.g. subpipeline). |
IProgress<System. |
progress | Progress reporter. |
Returns
Type | Description |
---|---|
IDisposable | Disposable used to terminate pipeline. |
RunAsync(TimeInterval, Boolean, IProgress<Double>)
Runs the pipeline asynchronously in replay mode. This method may be used when replaying data from a store.
Declaration
public IDisposable RunAsync(TimeInterval replayInterval, bool enforceReplayClock = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
Time |
replayInterval | The time interval within which to replay the data. The pipeline will commence playback at the start time of this interval, and only messages bearing an originating time within this interval will be retrieved from the store(s) contained in the pipeline and delivered. Pipeline execution will stop once all messages within this interval have been processed. |
System. |
enforceReplayClock | Whether to enforce the replay clock. If true, messages retrieved from the store(s) will be delivered according to their originating times, as though they were being generated in real-time. If false, messages retrieved from store(s) will be delivered as soon as possible irrespective of their originating times. |
IProgress<System. |
progress | An optional progress reporter for progress updates. |
Returns
Type | Description |
---|---|
IDisposable | An IDisposable instance which may be used to terminate the pipeline. |
WaitAll(Int32)
Wait for all components to complete.
Declaration
public bool WaitAll(int millisecondsTimeout = null)
Parameters
Type | Name | Description |
---|---|---|
System. |
millisecondsTimeout | Timeout (milliseconds). |
Returns
Type | Description |
---|---|
System. |
Success. |
WaitAll(TimeSpan)
Wait for all components to complete.
Declaration
public bool WaitAll(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
Time |
timeout | Timeout. |
Returns
Type | Description |
---|---|
System. |
Success. |
Events
View SourceComponentCompleted
Event that is raised upon component completion.
Declaration
public event EventHandler<ComponentCompletedEventArgs> ComponentCompleted
Event Type
Type | Description |
---|---|
Event |
PipelineCompleted
Event that is raised upon pipeline completion.
Declaration
public event EventHandler<PipelineCompletedEventArgs> PipelineCompleted
Event Type
Type | Description |
---|---|
Event |
PipelineExceptionNotHandled
Event that is raised when one or more unhandled exceptions occur in the pipeline. If a handler is attached to this event, any unhandled exceptions during pipeline execution will not be thrown, and will instead be handled by the attached handler. If no handler is attached, unhandled exceptions will be thrown within the execution context in which the exception occurred if the pipeline was run asynchronously via one of the RunAsync methods. This could cause the application to terminate abruptly. If the pipeline was run synchronously via one of the Run methods, an AggregateException will be thrown from the Run method (which may be caught).
Declaration
public event EventHandler<PipelineExceptionNotHandledEventArgs> PipelineExceptionNotHandled
Event Type
Type | Description |
---|---|
Event |
PipelineRun
Event that is raised when the pipeline starts running.
Declaration
public event EventHandler<PipelineRunEventArgs> PipelineRun
Event Type
Type | Description |
---|---|
Event |