Dataset

Dataset

A Dataset is a around a List of examples. Datasets are responsible for tracking all Operations done on them. This ensures data lineage and easy reporting of how changes in the data based on various Operations effects overall quality.

Dataset holds state (let's call it self.operations for now)
self.operations is a list of every function run on the Dataset since it's
initial creation. If loading from disk, track everything that happens in loading
phase in operations as well by simply initializing self.operations in constructors

Each operation should has the following attributes:
    operation hash
    name: function/callable name ideally, could be added with a decorator
    status: (not_started|completed)
    transformations: List[Transformation]
        commit hash
        timestamp(s) - start and end both? end is probably enough
        examples deleted
        examples added
        examples corrected
        annotations deleted
        annotations added
        annotations corrected

        for annotations deleted/added/corrected, include mapping from old Example hash to new Example hash
        that can be decoded for display later

All operations are serializable in the to_disk and from_disk methods.

So if I have 10 possible transformations.

I can run 1..5, save to disk train a model and check results. 
Then I can load that model from disk with all previous operations already tracked
in self.operations. Then I can run 6..10, save to disk and train model.
Now I have git-like "commits" for the data used in each model.

apply(self, func, *args, **kwargs)

Show source code in recon/dataset.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
    def apply(
        self, func: Callable[[List[Example], Any, Any], Any], *args: Any, **kwargs: Any
    ) -> Any:
        """Apply a function to the dataset

        Args:
            func (Callable[[List[Example], Any, Any], Any]): 
                Function from an existing recon module that can operate on a List of examples

        Returns:
            Result of running func on List of examples
        """
        return func(self.data, *args, **kwargs)  # type: ignore

Apply a function to the dataset

Parameters

Name Type Description Default
func Callable[[List[recon.types.Example], Any, Any], Any] Function from an existing recon module that can operate on a List of examples required

Returns

Type Description
Any Result of running func on List of examples

apply_(self, operation, *args, initial_state=None, **kwargs)

Show source code in recon/dataset.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    def apply_(
        self,
        operation: Union[str, Callable[[Any], OperationResult]],
        *args: Any,
        initial_state: OperationState = None,
        **kwargs: Any,
    ) -> None:
        """Apply an operation to all data inplace.

        Args:
            operation (Callable[[Any], OperationResult]): Any operation that
                changes data in place. See recon.operations.registry.operations
        """
        if isinstance(operation, str):
            operation = registry.operations.get(operation)
            if operation:
                operation = cast(Callable, operation)

        name = getattr(operation, "name", None)
        if name is None or name not in registry.operations:
            raise ValueError(
                "This function is not an operation. Ensure your function is registered in the operations registry."
            )

        result: OperationResult = operation(self, *args, initial_state=initial_state, **kwargs)  # type: ignore
        self.operations.append(result.state)
        dataset_changed = any(
            (
                result.state.examples_added,
                result.state.examples_removed,
                result.state.examples_changed,
            )
        )
        if dataset_changed:
            self.data = result.data

Apply an operation to all data inplace.

Parameters

Name Type Description Default
operation Union[str, Callable[[Any], recon.types.OperationResult]] Any operation that changes data in place. See recon.operations.registry.operations required

from_disk(self, path, loader_func=<function read_jsonl at 0x7f7e22c0e378>)

Show source code in recon/dataset.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
    def from_disk(self, path: Path, loader_func: Callable = read_jsonl) -> "Dataset":
        """Load Dataset from disk given a path and a loader function that reads the data
        and returns an iterator of Examples

        Args:
            path (Path): path to load from
            loader_func (Callable, optional): Callable that reads a file and returns a List of examples. 
                Defaults to [read_jsonl][recon.loaders.read_jsonl]
        """
        path = ensure_path(path)
        ds_op_state = None
        if (path.parent / ".recon" / self.name).exists():
            state = srsly.read_json(path.parent / ".recon" / self.name / "state.json")
            ds_op_state = DatasetOperationsState(**state)
            self.operations = ds_op_state.operations

        data = loader_func(path)
        self.data = data

        if ds_op_state and self.commit_hash != ds_op_state.commit:
            # Dataset changed, examples added
            self.operations.append(
                OperationState(
                    name="examples_added_external",
                    status=OperationStatus.COMPLETED,
                    ts=datetime.now(),
                    examples_added=max(len(self) - ds_op_state.size, 0),
                    examples_removed=max(ds_op_state.size - len(self), 0),
                    examples_changed=0,
                    transformations=[],
                )
            )

            for op in self.operations:
                op.status = OperationStatus.NOT_STARTED

        seen: Set[str] = set()
        operations_to_run: Dict[str, OperationState] = {}

        for op in self.operations:
            if (
                op.name not in operations_to_run
                and op.name in registry.operations
                and op.status != OperationStatus.COMPLETED
            ):
                operations_to_run[op.name] = op

        for op_name, state in operations_to_run.items():
            op = registry.operations.get(op_name)
            self.apply_(op, *state.args, initial_state=state, **state.kwargs)  # type: ignore

        return self

Load Dataset from disk given a path and a loader function that reads the data and returns an iterator of Examples

Parameters

Name Type Description Default
path Path path to load from required
loader_func Callable Callable that reads a file and returns a List of examples. Defaults to read_jsonl <function read_jsonl at 0x7f7e22c0e378>

pipe_(self, operations)

Show source code in recon/dataset.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    def pipe_(self, operations: List[Union[str, OperationState]]) -> None:
        """Run a sequence of operations on dataset data.
        Internally calls Dataset.apply_ and will resolve named
        operations in registry.operations

        Args:
            operations (List[Union[str, OperationState]]): List of operations
        """
        for op in operations:
            if isinstance(op, str):
                op_name = op
                args = []
                kwargs = {}
                initial_state = None
            elif isinstance(op, OperationState):
                op_name = op.name
                args = op.args
                kwargs = op.kwargs
                initial_state = op

            operation = registry.operations.get(op_name)

            self.apply_(operation, *args, initial_state=initial_state, **kwargs)

Run a sequence of operations on dataset data. Internally calls Dataset.apply_ and will resolve named operations in registry.operations

Parameters

Name Type Description Default
operations List[Union[str, recon.types.OperationState]] List of operations required

to_disk(self, output_path, force=False, save_examples=True)

Show source code in recon/dataset.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
    def to_disk(self, output_path: Path, force: bool = False, save_examples: bool = True) -> None:
        """Save Corpus to Disk

        Args:
            output_path (Path): Output file path to save data to
            force (bool): Force save to directory. Create parent directories
                or overwrite existing data.
            save_examples (bool): Save the example store along with the state.
        """
        output_path = ensure_path(output_path)
        output_dir = output_path.parent
        state_dir = output_dir / ".recon" / self.name
        if force:
            output_dir.mkdir(parents=True, exist_ok=True)

            if not state_dir.exists():
                state_dir.mkdir(parents=True, exist_ok=True)

        ds_op_state = DatasetOperationsState(
            name=self.name, commit=self.commit_hash, size=len(self), operations=self.operations
        )
        srsly.write_json(state_dir / "state.json", ds_op_state.dict())

        if save_examples:
            self.example_store.to_disk(state_dir / "example_store.jsonl")

        srsly.write_jsonl(output_path, [e.dict() for e in self.data])

Save Corpus to Disk

Parameters

Name Type Description Default
output_path Path Output file path to save data to required
force bool Force save to directory. Create parent directories or overwrite existing data. False
save_examples bool Save the example store along with the state. True