Class PsiStore
Provides static methods to access multi-stream \psi stores.
Namespace: Microsoft.Psi
Assembly: Microsoft.Psi.dll
Syntax
public static class PsiStore : object
Methods
View SourceConcatenate(IEnumerable<(String Name, String Path)>, (String Name, String Path), Boolean, IProgress<Double>, Action<String>)
Concatenates a set of \psi stores, generating a new store.
Declaration
public static void Concatenate(IEnumerable<(string Name, string Path)> storeFiles, (string Name, string Path) output, bool createSubdirectory = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<System.ValueTuple<String, String>> | storeFiles | Set of store files (name, path pairs) to concatenate. |
System.ValueTuple<String, String> | output | Output store (name, path pair). |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each concatenated store generated by multiple calls to this method. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
Remarks
Streams of the same name across stores must also have the same types as well as non-intersecting originating times.
Copy((String Name, String Path), (String Name, String Path), Func<PsiImporter, TimeInterval>, Predicate<IStreamMetadata>, Boolean, IProgress<Double>, Action<String>)
Copies a \psi store, or a subset of it.
Declaration
public static void Copy((string Name, string Path) input, (string Name, string Path) output, Func<PsiImporter, TimeInterval> cropIntervalFunction = null, Predicate<IStreamMetadata> includeStreamPredicate = null, bool createSubdirectory = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | input | The name and path of the store to crop. |
System.ValueTuple<String, String> | output | The name and path of the cropped store. |
Func<PsiImporter, TimeInterval> | cropIntervalFunction | An optional function that defines an originating time interval to copy. By default, the extents of the entire store. |
Predicate<IStreamMetadata> | includeStreamPredicate | An optional predicate that specifies which streams to include. By default, include all streams. By default, all streams are copied. |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each cropped store generated by multiple calls to this method. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
Create(Pipeline, String, String, Boolean, KnownSerializers)
Creates a new multi-stream \psi store and returns an Exporter instance which can be used to write streams to this store.
Declaration
public static PsiExporter Create(Pipeline pipeline, string name, string rootPath, bool createSubdirectory = true, KnownSerializers serializers = null)
Parameters
Type | Name | Description |
---|---|---|
Pipeline | pipeline | The pipeline to add the component to. |
String | name | The name of the store to create. |
String | rootPath | The path to use. If null, an in-memory store is created. |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each execution of the pipeline. |
KnownSerializers | serializers | An optional collection of custom serializers to use instead of the default ones. |
Returns
Type | Description |
---|---|
PsiExporter | An Exporter instance that can be used to write streams. |
Remarks
The Exporter maintains a collection of serializers it knows about, which it uses to serialize the data it writes to the store. By default, the Exporter derives the correct serializers from the TMessage type argument passed to Microsoft.Psi.Data.Exporter.Write(Microsoft.Psi.Emitter{Microsoft.Psi.Message{Microsoft.Psi.Common.BufferReader}},Microsoft.Psi.PsiStreamMetadata,Microsoft.Psi.DeliveryPolicy{Microsoft.Psi.Message{Microsoft.Psi.Common.BufferReader}}). In other words, for the most part simply knowing the stream type is sufficient to determine all the types needed to serialize the messages in the stream. Use the KnownSerializers parameter to override the default behavior and provide a custom set of serializers.
Crop((String Name, String Path), (String Name, String Path), TimeInterval, Boolean, IProgress<Double>, Action<String>)
Crops a \psi store between the extents of a specified originating time interval, generating a new store.
Declaration
public static void Crop((string Name, string Path) input, (string Name, string Path) output, TimeInterval cropInterval, bool createSubdirectory = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | input | The name and path of the store to crop. |
System.ValueTuple<String, String> | output | The name and path of the cropped store. |
TimeInterval | cropInterval | The originating time interval to which to crop the store. |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each cropped store generated by multiple calls to this method. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
Crop((String Name, String Path), (String Name, String Path), TimeSpan, RelativeTimeInterval, Boolean, IProgress<Double>, Action<String>)
Crops a \psi store between the extents of a specified interval, generating a new store.
Declaration
public static void Crop((string Name, string Path) input, (string Name, string Path) output, TimeSpan start, RelativeTimeInterval length, bool createSubdirectory = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | input | The name and path of the store to crop. |
System.ValueTuple<String, String> | output | The name and path of the cropped store. |
TimeSpan | start | Start of crop interval relative to beginning of store. |
RelativeTimeInterval | length | Length of crop interval. |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each cropped store generated by multiple calls to this method. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
CropInPlace((String Name, String Path), TimeInterval, Boolean, IProgress<Double>, Action<String>)
Crops a \psi store in place between the extents of a specified originating time interval.
Declaration
public static void CropInPlace((string Name, string Path) input, TimeInterval cropInterval, bool deleteOriginalStore = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | input | The name and path of the store to crop. |
TimeInterval | cropInterval | The originating time interval to which to crop the store. |
System.Boolean | deleteOriginalStore | Indicates whether the original store should be deleted. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
CropInPlace((String Name, String Path), TimeSpan, RelativeTimeInterval, Boolean, IProgress<Double>, Action<String>)
Crops a \psi store in place between the extents of a specified interval.
Declaration
public static void CropInPlace((string Name, string Path) input, TimeSpan start, RelativeTimeInterval length, bool deleteOriginalStore = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | input | The name and path of the store to crop. |
TimeSpan | start | Start of crop interval relative to beginning of store. |
RelativeTimeInterval | length | Length of crop interval. |
System.Boolean | deleteOriginalStore | Indicates whether the original store should be deleted. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
Delete((String Name, String Path), Boolean)
Delete a \psi store.
Declaration
public static void Delete((string Name, string Path) store, bool deleteDirectoryIfOtherwiseEmpty = false)
Parameters
Type | Name | Description |
---|---|---|
System.ValueTuple<String, String> | store | The name and path of the store to delete. |
System.Boolean | deleteDirectoryIfOtherwiseEmpty | Whether to delete the containing directory if it is empty after removing store files. |
EnumerateStores(String, Boolean)
Enumerates all stores under a given path.
Declaration
public static IEnumerable<(string Name, string Path)> EnumerateStores(string rootPath, bool recursively = true)
Parameters
Type | Name | Description |
---|---|---|
String | rootPath | The root path to search. |
System.Boolean | recursively | A value indicating whether to search recursively. |
Returns
Type | Description |
---|---|
IEnumerable<System.ValueTuple<String, String>> | An enumeration of names and paths to stores found. |
Exists(String, String)
Indicates whether the specified \psi store file exists.
Declaration
public static bool Exists(string name, string path)
Parameters
Type | Name | Description |
---|---|---|
String | name | The name of the store to check. |
String | path | The path of the store to check. |
Returns
Type | Description |
---|---|
System.Boolean | Returns true if the store exists. |
GetPathToLatestVersion(String, String)
Get path to latest version of store.
Declaration
public static string GetPathToLatestVersion(string storeName, string rootPath)
Parameters
Type | Name | Description |
---|---|---|
String | storeName | The name of the store. |
String | rootPath | The root path of the store. |
Returns
Type | Description |
---|---|
String | Path to latest version of store. |
IsClosed(String, String)
Indicates whether all streams in a \psi store have been marked as "closed".
Declaration
public static bool IsClosed(string name, string path)
Parameters
Type | Name | Description |
---|---|---|
String | name | The name of the store to check. |
String | path | The path of the store to check. |
Returns
Type | Description |
---|---|
System.Boolean | Returns true if all streams in the store are marked as closed. |
Open(Pipeline, String, String, Boolean)
Opens a \psi store for read and returns a PsiImporter instance which can be used to inspect the store and open the streams. The store metadata is available immediately after this call (before the pipeline is running) via the AvailableStreams property.
Declaration
public static PsiImporter Open(Pipeline pipeline, string name, string rootPath, bool usePerStreamReaders = true)
Parameters
Type | Name | Description |
---|---|---|
Pipeline | pipeline | The pipeline to add the component to. |
String | name | The name of the store to open (the same as the catalog file name). |
String | rootPath | The path to the store. This can be one of:
|
System.Boolean | usePerStreamReaders | Optional flag indicating whether to use per-stream readers (see remarks). |
Returns
Type | Description |
---|---|
PsiImporter | A PsiImporter instance that can be used to open streams and read messages. |
Remarks
The PsiImporter maintains a collection of serializers it knows about, which it uses to deserialize the data it reads form the store. By default, the PsiImporter derives the correct serializers from the type argument passed to OpenStream<T>(String, Func<T>, Action<T>). In other words, for the most part simply knowing the stream type is sufficient to determine all the types needed to deserialize the messages in the stream. However, there are two cases when this automatic behavior might not work:
- When one of the required types changed between the version used to serialize the file and the current version, in a way that breaks versioning rules. Use the Register<T>(String, CloningFlags) method to remap the name of the old type to a new, compatible type.
- When the declared type of a field is different than the actual value assigned to it (polymorphic fields) and the value assigned is of a type that implements the DataContract serialization rules. In this case, use the Register<T>(CloningFlags) method to let the serialization system know which compatible concrete type to use for that DataContract name. Additionally, the usePerStreamReaders flag causes the importer to create separate readers for each opened stream. When a store is written, messages from multiple streams are serialized into store files as they arrive at the Exporter. Messages within a stream are guaranteed to be persisted in time-order. However, messages across multiple streams are interleaved and there is no guarantee that the interleaving preserves time-ordering. A single stream reader emits messages in the originally interleaved order. Using a single stream reader results in a delay of messages that may come before other messages in time, but physically after them in the persisted store. This results in an apparent emitted latency. Using individual stream readers (usePerStreamReaders=true) allows messages to be emitted at a pipeline time approximating the original creation time; regardless of physical interleaved ordering. The benefit is a better approximation of the live conditions during replay, while the drawback is a negligible performance impact.
Process(Func<IStreamMetadata, Boolean>, Action<IStreamMetadata, PsiImporter, Exporter>, (String Name, String Path), (String Name, String Path), Boolean, IProgress<Double>, Action<String>)
Processes a \psi store, generating a new store.
Declaration
public static void Process(Func<IStreamMetadata, bool> predicate, Action<IStreamMetadata, PsiImporter, Exporter> processor, (string Name, string Path) input, (string Name, string Path) output, bool createSubdirectory = true, IProgress<double> progress = null, Action<string> loggingCallback = null)
Parameters
Type | Name | Description |
---|---|---|
Func<IStreamMetadata, System.Boolean> | predicate | Predicate function determining whether to process a stream or else should be merely copied. |
Action<IStreamMetadata, PsiImporter, Exporter> | processor | Processor action given stream metadata, importer and exporter. |
System.ValueTuple<String, String> | input | The name and path of the store to process. |
System.ValueTuple<String, String> | output | The name and path of the processed store. |
System.Boolean | createSubdirectory | Indicates whether to create a numbered subdirectory for each store generated by multiple calls to this method. |
IProgress<System.Double> | progress | An optional progress reporter for progress updates. |
Action<String> | loggingCallback | An optional callback to which human-friendly information will be logged. |
Repair(String, String, Boolean, IProgress<Double>)
Repairs an invalid \psi store in place.
Declaration
public static void Repair(string name, string path, bool deleteOriginalStore = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
String | name | The name of the store to check. |
String | path | The path of the store to check. |
System.Boolean | deleteOriginalStore | Indicates whether the original store should be deleted. |
IProgress<System.Double> | progress | An optional progress updates receiver. |
RepairAsync(String, String, Boolean, IProgress<Double>)
Repairs an invalid \psi store in place.
Declaration
public static async Task RepairAsync(string name, string path, bool deleteOriginalStore = true, IProgress<double> progress = null)
Parameters
Type | Name | Description |
---|---|---|
String | name | The name of the store to check. |
String | path | The path of the store to check. |
System.Boolean | deleteOriginalStore | Indicates whether the original store should be deleted. |
IProgress<System.Double> | progress | An optional progress updates receiver. |
Returns
Type | Description |
---|---|
Task | The task for repairing an invalid \psi store in place. |
SummarizeDistinctKeysInSupplementalMetadata<TKey, TValue>(IProducer<Dictionary<TKey, TValue>>, Exporter)
Stores supplemental metadata representing distinct dictionary keys seen on the stream.
Declaration
public static void SummarizeDistinctKeysInSupplementalMetadata<TKey, TValue>(this IProducer<Dictionary<TKey, TValue>> source, Exporter writer)
Parameters
Type | Name | Description |
---|---|---|
IProducer<Dictionary<TKey, TValue>> | source | The source stream to write. |
Exporter | writer | The store writer, created by e.g. Create(Pipeline, String, String, Boolean, KnownSerializers). |
Type Parameters
Name | Description |
---|---|
TKey | The type of dictionary key in the stream. |
TValue | The type of dictionary value in the stream. |
TryGetStreamMetadata(Pipeline, String, out IStreamMetadata)
Returns the metadata associated with the specified stream, if the stream is persisted to a \psi store.
Declaration
public static bool TryGetStreamMetadata(Pipeline pipeline, string streamName, out IStreamMetadata metadata)
Parameters
Type | Name | Description |
---|---|---|
Pipeline | pipeline | The pipeline to add the component to. |
String | streamName | The name of the stream to retrieve metadata about. |
IStreamMetadata | metadata | Upon return, this parameter contains the metadata associated with the stream, or null if the stream is not persisted. |
Returns
Type | Description |
---|---|
System.Boolean | True if the stream is persisted to a store, false otherwise. |
TryGetStreamMetadata<T>(IProducer<T>, out IStreamMetadata)
Returns the metadata associated with the specified stream, if the stream is persisted to a \psi store.
Declaration
public static bool TryGetStreamMetadata<T>(IProducer<T> source, out IStreamMetadata metadata)
Parameters
Type | Name | Description |
---|---|---|
IProducer<T> | source | The stream to retrieve metadata about. |
IStreamMetadata | metadata | Upon return, this parameter contains the metadata associated with the stream, or null if the stream is not persisted. |
Returns
Type | Description |
---|---|
System.Boolean | True if the stream is persisted to a store, false otherwise. |
Type Parameters
Name | Description |
---|---|
T | The type of stream messages. |
Write<TMessage>(IProducer<TMessage>, String, Exporter, Boolean, DeliveryPolicy<TMessage>)
Writes the specified stream to a multi-stream \psi store.
Declaration
public static IProducer<TMessage> Write<TMessage>(this IProducer<TMessage> source, string name, Exporter writer, bool largeMessages = false, DeliveryPolicy<TMessage> deliveryPolicy = null)
Parameters
Type | Name | Description |
---|---|---|
IProducer<TMessage> | source | The source stream to write. |
String | name | The name of the persisted stream. |
Exporter | writer | The store writer, created by e.g. Create(Pipeline, String, String, Boolean, KnownSerializers). |
System.Boolean | largeMessages | Indicates whether the stream contains large messages (typically >4k). If true, the messages will be written to the large message file. |
DeliveryPolicy<TMessage> | deliveryPolicy | An optional delivery policy. |
Returns
Type | Description |
---|---|
IProducer<TMessage> | The input stream. |
Type Parameters
Name | Description |
---|---|
TMessage | The type of messages in the stream. |
Write<TMessage, TSupplementalMetadata>(IProducer<TMessage>, TSupplementalMetadata, String, Exporter, Boolean, DeliveryPolicy<TMessage>)
Writes the specified stream to a multi-stream \psi store.
Declaration
public static IProducer<TMessage> Write<TMessage, TSupplementalMetadata>(this IProducer<TMessage> source, TSupplementalMetadata supplementalMetadataValue, string name, Exporter writer, bool largeMessages = false, DeliveryPolicy<TMessage> deliveryPolicy = null)
Parameters
Type | Name | Description |
---|---|---|
IProducer<TMessage> | source | The source stream to write. |
TSupplementalMetadata | supplementalMetadataValue | Supplemental metadata value. |
String | name | The name of the persisted stream. |
Exporter | writer | The store writer, created by e.g. Create(Pipeline, String, String, Boolean, KnownSerializers). |
System.Boolean | largeMessages | Indicates whether the stream contains large messages (typically >4k). If true, the messages will be written to the large message file. |
DeliveryPolicy<TMessage> | deliveryPolicy | An optional delivery policy. |
Returns
Type | Description |
---|---|
IProducer<TMessage> | The input stream. |
Type Parameters
Name | Description |
---|---|
TMessage | The type of messages in the stream. |
TSupplementalMetadata | The type of supplemental stream metadata. |
WriteEnvelopes<TIn>(IProducer<TIn>, String, Exporter, DeliveryPolicy<TIn>)
Writes the envelopes for the specified stream to a multi-stream \psi store.
Declaration
public static IProducer<TIn> WriteEnvelopes<TIn>(this IProducer<TIn> source, string name, Exporter writer, DeliveryPolicy<TIn> deliveryPolicy = null)
Parameters
Type | Name | Description |
---|---|---|
IProducer<TIn> | source | The source stream for which to write envelopes. |
String | name | The name of the persisted stream. |
Exporter | writer | The store writer, created by e.g. Create(Pipeline, String, String, Boolean, KnownSerializers). |
DeliveryPolicy<TIn> | deliveryPolicy | An optional delivery policy. |
Returns
Type | Description |
---|---|
IProducer<TIn> | The input stream. |
Type Parameters
Name | Description |
---|---|
TIn | The type of messages in the stream. |