Skip to content

Decision Tables

DecisionTable: A flexible, serializable SSVC decision table model.

DecisionTable

Bases: _Registered, _SchemaVersioned, _GenericSsvcObject, _Commented, BaseModel

DecisionTable: A flexible, serializable SSVC decision table model.

This model represents a decision table that can be used to map combinations of decision point values to outcomes. It allows for flexible mapping and can be used with helper methods to generate DataFrame and CSV representations of the decision table.

Attributes:

Source code in src/ssvc/decision_tables/base.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class DecisionTable(
    _Registered, _SchemaVersioned, _GenericSsvcObject, _Commented, BaseModel
):
    """
    DecisionTable: A flexible, serializable SSVC decision table model.

    This model represents a decision table that can be used to map combinations of decision point values
    to outcomes. It allows for flexible mapping and can be used with helper methods to generate DataFrame and CSV representations
    of the decision table.

    Attributes:
    """

    key_prefix: ClassVar[str] = "DT"
    _schema_version: ClassVar[str] = SCHEMA_VERSION
    schemaVersion: Literal[SCHEMA_VERSION]

    decision_points: DecisionPointDict

    outcome: str = Field(
        ...,
        description="The key of the decision point in `self.decision_points` that represents the outcome of the decision table.",
        min_length=1,
    )

    # default to empty mapping list
    mapping: list[dict[str, str]] = Field(
        default_factory=list,
        description="Mapping of decision point values to outcomes.",
    )

    @property
    def id(self):
        return f"{self.namespace}:{self.name}:{self.version}"

    @field_validator("key", mode="before")
    @classmethod
    def validate_key(cls, value: str) -> str:
        if value.startswith(f"{cls.key_prefix}_"):
            return value

        # prepend the key prefix if it is not already present
        key = f"{cls.key_prefix}_{value}"
        return key

    @model_validator(mode="after")
    def populate_mapping_if_empty(self):
        """
        Populate the mapping if it is not already set.

        Returns:
            self: The DecisionTable instance with the mapping populated if it was not set. If the mapping is already set, it returns the instance unchanged.
        """
        # short-circuit if mapping is already set
        if self.mapping:
            # mapping is already set, no need to populate
            logger.debug("Mapping is already set, skipping population.")
            return self

        outcome_key = self.outcome

        dps = [
            dp
            for dpid, dp in self.decision_points.items()
            if dpid != outcome_key
        ]
        mapping = dplist_to_toposort(dps)

        # mapping is a list of dicts
        # but mapping doesn't have the outcome key yet
        # add the key with None as the value
        for row in mapping:
            # row is a dict with decision point values
            # we need to add the outcome key
            if outcome_key in row:
                # if the outcome key is already in the row, we should not overwrite it
                logger.warning(
                    f"Outcome key '{outcome_key}' already exists in row, skipping."
                )
            row[outcome_key] = None

        # distribute outcomes evenly across the mapping
        og: DecisionPoint = self.decision_points[outcome_key]

        mapping = distribute_outcomes_evenly(mapping, og)

        # set the mapping
        self.mapping = mapping
        return self

    @model_validator(mode="after")
    def check_mapping_keys(self):
        """
        Validate that each item in the mapping has the correct keys.
        Keys for each item should match the keys of the decision point group.

        Returns:
            self: The DecisionTable instance with validated mapping keys.
        Raises:
            TypeError: If any item in the mapping is not a dictionary.
            ValueError: If any item in the mapping does not have the expected keys.
        """
        # we expect the keys of each item in the mapping to match the decision point group keys
        expected = set(self.decision_points.keys())

        for i, d in enumerate(self.mapping):
            if not isinstance(d, dict):
                raise TypeError(f"Item {i} is not a dict")
            actual_keys = set(d.keys())
            if actual_keys != expected:
                raise ValueError(
                    f"Item {i} has keys {actual_keys}, expected {expected}"
                )
        return self

    @model_validator(mode="after")
    def remove_duplicate_mapping_rows(self):
        seen = dict()
        new_mapping = []
        for row in self.mapping:
            value_tuple = tuple(v for k, v in row.items() if k != self.outcome)
            if value_tuple in seen:
                # we have a duplicate, but is it same or different?
                if seen[value_tuple][self.outcome] == row[self.outcome]:
                    # if it's a match, just log it and move on
                    logger.warning(
                        f"Duplicate mapping found (removed automatically): {row}"
                    )
                else:
                    # they don't match
                    raise ValueError(
                        f"Conflicting mappings found: {seen[value_tuple]} != {row}"
                    )
            else:
                # not a duplicate, add it to the new mapping
                seen[value_tuple] = row
                new_mapping.append(row)
        # set the new mapping (with duplicates removed)
        self.mapping = new_mapping
        return self

    @model_validator(mode="after")
    def check_mapping_coverage(self):
        counts = {}
        all_combos = dpdict_to_combination_list(
            self.decision_points, exclude=[self.outcome]
        )
        # all_combos is a dict of all possible combinations of decision point values
        # keyed by decision point ID, with value keys as values.
        # initialize counts for all input combinations to 0
        for combo in all_combos:
            value_tuple = tuple(combo.values())
            counts[value_tuple] = counts.get(value_tuple, 0)

        # counts now has all possible input combinations set to count 0

        for row in self.mapping:
            value_tuple = tuple(v for k, v in row.items() if k != self.outcome)
            counts[value_tuple] += 1

        # check if all combinations are covered
        for k, v in counts.items():
            if v == 1:
                # ok, proceed
                continue
            elif v == 0:
                # missing combination
                raise ValueError(
                    f"Mapping is incomplete: No mapping found for decision point combination: {k}."
                )
            elif v > 1:
                # duplicate. remove duplicate mapping rows should have caught this already
                raise ValueError(
                    f"Duplicate mapping found for decision point combination: {k}."
                )
            else:
                raise ValueError(
                    f"Unexpected count in mapping coverage check.{k}: {v}"
                )

        # if you made it to here, all the counts were 1, so we're good

        return self

    @model_validator(mode="after")
    def validate_mapping(self):
        """
        Validate the mapping after it has been populated.

        This method checks that the mapping is consistent with the decision points and outcomes defined in the table.
        It raises a ValueError if the mapping is not valid.

        Returns:
            self: The DecisionTable instance with validated mapping.
        """
        if not self.mapping:
            raise ValueError("Mapping must be set before validation.")

        # Check that each MappingRow has the correct number of decision point values
        for row in self.mapping:
            if set(row.keys()) != set(self.decision_points.keys()):
                raise ValueError(
                    "MappingRow does not have the correct keys. "
                    "Keys must match the decision point group keys."
                )

        # Verify the topological order of the decision points (if u<v then u_outcome <= v_outcome)
        problems = check_topological_order(self)
        if len(problems) > 0:
            logger.warning("Topological order check found problems:")
            for problem in problems:
                logger.warning(f"Problem: {problem}")
            raise ValueError(
                "Topological order check failed. See logs for details."
            )
        else:
            logger.debug("Topological order check passed with no problems.")

        # if there's only one decision point mapping to the outcome, we can stop here
        input_cols = [
            dp for dp in self.decision_points.values() if dp.id != self.outcome
        ]
        if len(input_cols) <= 1:
            return self

        # reject if any irrelevant columns are present in the mapping
        fi = feature_importance(self)
        irrelevant_features = fi[fi["feature_importance"] <= 0]
        if not irrelevant_features.empty:
            logger.warning(
                "Mapping contains irrelevant features: "
                f"{', '.join(irrelevant_features['feature'].tolist())}"
            )
            raise ValueError(
                "Mapping contains irrelevant features. "
                "Please remove them before proceeding."
            )

        return self

    def obfuscate(self) -> "DecisionTable":
        """
        Obfuscate the decision table by renaming the dict keys.
        """
        obfuscated_dpdict, translator = obfuscate_dict(self.decision_points)

        new_table = self.model_copy(deep=True)
        new_table.decision_points = obfuscated_dpdict
        new_table.outcome = translator.get(self.outcome, self.outcome)
        # replace all the keys in mapping dicts
        new_table.mapping = []
        for row in self.mapping:
            new_row = {}
            for key in row.keys():
                new_key = translator[key]
                new_row[new_key] = row[key]
            new_table.mapping.append(new_row)

        return new_table

check_mapping_keys()

Validate that each item in the mapping has the correct keys. Keys for each item should match the keys of the decision point group.

Returns:

Name Type Description
self

The DecisionTable instance with validated mapping keys.

Raises: TypeError: If any item in the mapping is not a dictionary. ValueError: If any item in the mapping does not have the expected keys.

Source code in src/ssvc/decision_tables/base.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@model_validator(mode="after")
def check_mapping_keys(self):
    """
    Validate that each item in the mapping has the correct keys.
    Keys for each item should match the keys of the decision point group.

    Returns:
        self: The DecisionTable instance with validated mapping keys.
    Raises:
        TypeError: If any item in the mapping is not a dictionary.
        ValueError: If any item in the mapping does not have the expected keys.
    """
    # we expect the keys of each item in the mapping to match the decision point group keys
    expected = set(self.decision_points.keys())

    for i, d in enumerate(self.mapping):
        if not isinstance(d, dict):
            raise TypeError(f"Item {i} is not a dict")
        actual_keys = set(d.keys())
        if actual_keys != expected:
            raise ValueError(
                f"Item {i} has keys {actual_keys}, expected {expected}"
            )
    return self

obfuscate()

Obfuscate the decision table by renaming the dict keys.

Source code in src/ssvc/decision_tables/base.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def obfuscate(self) -> "DecisionTable":
    """
    Obfuscate the decision table by renaming the dict keys.
    """
    obfuscated_dpdict, translator = obfuscate_dict(self.decision_points)

    new_table = self.model_copy(deep=True)
    new_table.decision_points = obfuscated_dpdict
    new_table.outcome = translator.get(self.outcome, self.outcome)
    # replace all the keys in mapping dicts
    new_table.mapping = []
    for row in self.mapping:
        new_row = {}
        for key in row.keys():
            new_key = translator[key]
            new_row[new_key] = row[key]
        new_table.mapping.append(new_row)

    return new_table

populate_mapping_if_empty()

Populate the mapping if it is not already set.

Returns:

Name Type Description
self

The DecisionTable instance with the mapping populated if it was not set. If the mapping is already set, it returns the instance unchanged.

Source code in src/ssvc/decision_tables/base.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@model_validator(mode="after")
def populate_mapping_if_empty(self):
    """
    Populate the mapping if it is not already set.

    Returns:
        self: The DecisionTable instance with the mapping populated if it was not set. If the mapping is already set, it returns the instance unchanged.
    """
    # short-circuit if mapping is already set
    if self.mapping:
        # mapping is already set, no need to populate
        logger.debug("Mapping is already set, skipping population.")
        return self

    outcome_key = self.outcome

    dps = [
        dp
        for dpid, dp in self.decision_points.items()
        if dpid != outcome_key
    ]
    mapping = dplist_to_toposort(dps)

    # mapping is a list of dicts
    # but mapping doesn't have the outcome key yet
    # add the key with None as the value
    for row in mapping:
        # row is a dict with decision point values
        # we need to add the outcome key
        if outcome_key in row:
            # if the outcome key is already in the row, we should not overwrite it
            logger.warning(
                f"Outcome key '{outcome_key}' already exists in row, skipping."
            )
        row[outcome_key] = None

    # distribute outcomes evenly across the mapping
    og: DecisionPoint = self.decision_points[outcome_key]

    mapping = distribute_outcomes_evenly(mapping, og)

    # set the mapping
    self.mapping = mapping
    return self

validate_mapping()

Validate the mapping after it has been populated.

This method checks that the mapping is consistent with the decision points and outcomes defined in the table. It raises a ValueError if the mapping is not valid.

Returns:

Name Type Description
self

The DecisionTable instance with validated mapping.

Source code in src/ssvc/decision_tables/base.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@model_validator(mode="after")
def validate_mapping(self):
    """
    Validate the mapping after it has been populated.

    This method checks that the mapping is consistent with the decision points and outcomes defined in the table.
    It raises a ValueError if the mapping is not valid.

    Returns:
        self: The DecisionTable instance with validated mapping.
    """
    if not self.mapping:
        raise ValueError("Mapping must be set before validation.")

    # Check that each MappingRow has the correct number of decision point values
    for row in self.mapping:
        if set(row.keys()) != set(self.decision_points.keys()):
            raise ValueError(
                "MappingRow does not have the correct keys. "
                "Keys must match the decision point group keys."
            )

    # Verify the topological order of the decision points (if u<v then u_outcome <= v_outcome)
    problems = check_topological_order(self)
    if len(problems) > 0:
        logger.warning("Topological order check found problems:")
        for problem in problems:
            logger.warning(f"Problem: {problem}")
        raise ValueError(
            "Topological order check failed. See logs for details."
        )
    else:
        logger.debug("Topological order check passed with no problems.")

    # if there's only one decision point mapping to the outcome, we can stop here
    input_cols = [
        dp for dp in self.decision_points.values() if dp.id != self.outcome
    ]
    if len(input_cols) <= 1:
        return self

    # reject if any irrelevant columns are present in the mapping
    fi = feature_importance(self)
    irrelevant_features = fi[fi["feature_importance"] <= 0]
    if not irrelevant_features.empty:
        logger.warning(
            "Mapping contains irrelevant features: "
            f"{', '.join(irrelevant_features['feature'].tolist())}"
        )
        raise ValueError(
            "Mapping contains irrelevant features. "
            "Please remove them before proceeding."
        )

    return self

check_topological_order(dt)

Check the topological order of the decision table. This function uses the check_topological_order function from the csv_analyzer module to verify the topological order of the decision table. It returns a list of dictionaries containing any problems found in the topological order check.

Parameters:

Name Type Description Default
dt DecisionTable

DecisionTable: The decision table to check.

required

Returns:

Type Description
list[dict]

list[dict]: A list of dictionaries containing any problems found in the topological order check.

list[dict]

Problems are defined as any pair of mappings (u,v) where u < v but u_outcome > v_outcome.

list[dict]

Each dictionary contains the following keys:

list[dict]

"u": The lower decision point value

list[dict]

"v": The higher decision point value

list[dict]

"u_outcome": The outcome of the lower decision point value

list[dict]

"v_outcome": The outcome of the higher decision point value

Source code in src/ssvc/decision_tables/base.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def check_topological_order(dt: DecisionTable) -> list[dict]:
    """
    Check the topological order of the decision table.
    This function uses the `check_topological_order` function from the csv_analyzer module to verify the topological order of the decision table.
    It returns a list of dictionaries containing any problems found in the topological order check.

    Args:
        dt: DecisionTable: The decision table to check.

    Returns:
        list[dict]: A list of dictionaries containing any problems found in the topological order check.
        Problems are defined as any pair of mappings `(u,v)` where `u < v` but `u_outcome > v_outcome`.

        Each dictionary contains the following keys:
        "u": The lower decision point value
        "v": The higher decision point value
        "u_outcome": The outcome of the lower decision point value
        "v_outcome": The outcome of the higher decision point value

    """
    from ssvc.csv_analyzer import check_topological_order

    logger.debug("Checking topological order of the decision table.")
    df = decision_table_to_shortform_df(dt)
    target = _get_target_column_name(df.columns[-1])
    target_values = dt.decision_points[dt.outcome].values
    target_value_order = [v.key for v in target_values]
    return check_topological_order(
        df, target=target, target_value_order=target_value_order
    )

decision_table_to_csv(dt, **kwargs)

Wrapper around to_df to export to CSV string. Args: dt (DecisionTable): The decision table to export. kwargs: Additional keyword arguments to pass to pandas.DataFrame.to_csv().

Returns:

Name Type Description
str str

The mapping table as a CSV string.

Source code in src/ssvc/decision_tables/base.py
394
395
396
397
398
399
400
401
402
403
def decision_table_to_csv(dt: DecisionTable, **kwargs) -> str:
    """Wrapper around to_df to export to CSV string.
    Args:
        dt (DecisionTable): The decision table to export.
        kwargs: Additional keyword arguments to pass to pandas.DataFrame.to_csv().

    Returns:
        str: The mapping table as a CSV string.
    """
    return decision_table_to_df(dt).to_csv(**kwargs)

decision_table_to_df(dt, longform=False)

Export the decision table to a pandas DataFrame.

This is just a wrapper around the shortform and longform export functions.

Parameters:

Name Type Description Default
dt DecisionTable

The decision table to export.

required
longform bool

Whether to export in long form or short form. Defaults to False (short form).

False

Returns: pd.DataFrame: The mapping table as a pandas DataFrame.

Source code in src/ssvc/decision_tables/base.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def decision_table_to_df(dt: DecisionTable, longform=False) -> pd.DataFrame:
    """
    Export the decision table to a pandas DataFrame.

    This is just a wrapper around the shortform and longform export functions.

    Args:
        dt (DecisionTable): The decision table to export.
        longform (bool): Whether to export in long form or short form. Defaults to False (short form).
    Returns:
        pd.DataFrame: The mapping table as a pandas DataFrame.

    """
    if longform:
        return decision_table_to_longform_df(dt)
    return decision_table_to_shortform_df(dt)

decision_table_to_longform_df(dt)

Given a DecisionTable, convert it to a long-form DataFrame. The DataFrame will have one row per decision point value combination, with columns for each decision point and the outcome. The column names will be the decision point names with their versions, and the values will be the value names. If the decsion point is from a namespace other than "ssvc", the column name will include the namespace in parentheses.

Example

Column Heading format: {decision_point_name} v{version} ({namespace})

row,Supplier Involvement v1.0.0,Exploitation v1.0.0,Public Value Added v1.0.0,MoSCoW v1.0.0 (basic)
0,fix ready,none,limited,won't
1,fix ready,none,ampliative,won't
2,fix ready,none,precedence,won't

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame from to_df().

required

Returns:

Type Description
DataFrame

pd.DataFrame: The converted DataFrame.

Source code in src/ssvc/decision_tables/base.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def decision_table_to_longform_df(dt: DecisionTable) -> pd.DataFrame:
    """
    Given a DecisionTable, convert it to a long-form DataFrame.
    The DataFrame will have one row per decision point value combination, with columns for each decision point and the outcome.
    The column names will be the decision point names with their versions, and the values will be the value names.
    If the decsion point is from a namespace other than "ssvc", the column name will include the namespace in parentheses.


    Example:
        Column Heading format: `{decision_point_name} v{version} ({namespace})`

        ```csv
        row,Supplier Involvement v1.0.0,Exploitation v1.0.0,Public Value Added v1.0.0,MoSCoW v1.0.0 (basic)
        0,fix ready,none,limited,won't
        1,fix ready,none,ampliative,won't
        2,fix ready,none,precedence,won't
        ```

    Args:
        df (pd.DataFrame): The input DataFrame from `to_df()`.

    Returns:
        pd.DataFrame: The converted DataFrame.

    """

    df = decision_table_to_shortform_df(dt)

    def _col_check(col: str) -> bool:
        """
        Check if the column is a valid decision point or outcome column.
        Args:
            col: a colon-separated string representing a decision point or outcome column in the format `namespace:dp_key:version`.

        Returns:
            bool: True if the column is a valid decision point or outcome column, False otherwise.

        """
        # late-binding import to avoid circular import issues
        registry = get_registry()

        ns, dp_key, version = col.split(":")

        return (
            registry.lookup(
                objtype="DecisionPoint",
                namespace=ns,
                key=dp_key,
                version=version,
            )
            is not None
        )

    # Replace cell values using DPV_REGISTRY
    for col in df.columns:
        logger.debug(f"Converting column: {col}")

        ns, dp_key, version = col.split(":")

        if _col_check(col):
            dp_id = col
            newcol = df[col].apply(_replace_value_keys, dp_id=dp_id)
            df[col] = newcol

    # lowercase all cell values
    df = df.apply(
        lambda col: col.map(lambda x: x.lower() if isinstance(x, str) else x)
    )

    # Rename columns using DP_REGISTRY

    rename_map = {
        col: _rename_column(col) for col in df.columns if _col_check(col)
    }

    df = df.rename(
        columns=rename_map,
    )

    return df

decision_table_to_shortform_df(dt)

Export the mapping to pandas DataFrame.

Columns: one per decision point, one for outcome. Column names are namespace🔑version. Individual decision point and outcome values are represented by their value key.

Example

Table values might look like:

ssvc:SINV:1.0.0,ssvc:E:1.0.0,ssvc:PVA:1.0.0,basic:MSCW:1.0.0
FR,N,L,W
FR,N,A,W
FR,N,P,W
FR,P,L,W
FR,P,A,W
FR,P,P,W
FR,A,L,W
FR,A,A,C
etc.

Returns:

Name Type Description
df DataFrame

pd.DataFrame: The mapping as a pandas DataFrame.

Raises:

Type Description
ValueError

If the decision table has no mapping to export.

Source code in src/ssvc/decision_tables/base.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def decision_table_to_shortform_df(dt: DecisionTable) -> pd.DataFrame:
    """
    Export the mapping to pandas DataFrame.

    Columns: one per decision point, one for outcome. Column names are namespace:key:version.
    Individual decision point and outcome values are represented by their value key.

    Example:
        Table values might look like:

        ```csv
        ssvc:SINV:1.0.0,ssvc:E:1.0.0,ssvc:PVA:1.0.0,basic:MSCW:1.0.0
        FR,N,L,W
        FR,N,A,W
        FR,N,P,W
        FR,P,L,W
        FR,P,A,W
        FR,P,P,W
        FR,A,L,W
        FR,A,A,C
        ```
        etc.

    Returns:
        df: pd.DataFrame: The mapping as a pandas DataFrame.

    Raises:
        ValueError: If the decision table has no mapping to export.
    """
    if not dt.mapping:
        raise ValueError("Decision Table has no mapping to export.")

    df = pd.DataFrame(dt.mapping)
    return df

distribute_outcomes_evenly(mapping, outcome_group)

Distribute the given outcome_values across the mapping item dicts in sorted order. Overwrites the outcome value in each mapping dict item with the corresponding outcome value. The earliest mappings get the lowest outcome value, the latest get the highest. If the mapping count is not divisible by the number of outcomes, the last outcome(s) get the remainder. Returns a new list of dicts with outcome values assigned.

Parameters:

Name Type Description Default
mapping list[dict[str, str]]

The mapping to distribute outcomes across.

required
outcome_values list[str]

The list of outcome values to distribute.

required

Returns: list[dict[str,str]]: A new list of dicts with outcome values assigned.

Source code in src/ssvc/decision_tables/base.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def distribute_outcomes_evenly(
    mapping: list[dict[str, str]], outcome_group: DecisionPoint
) -> list[dict[str, str]]:
    """
    Distribute the given outcome_values across the mapping item dicts in sorted order.
    Overwrites the outcome value in each mapping dict item with the corresponding outcome value.
    The earliest mappings get the lowest outcome value, the latest get the highest.
    If the mapping count is not divisible by the number of outcomes, the last outcome(s) get the remainder.
    Returns a new list of dicts with outcome values assigned.

    Args:
        mapping (list[dict[str,str]]): The mapping to distribute outcomes across.
        outcome_values (list[str]): The list of outcome values to distribute.
    Returns:
        list[dict[str,str]]: A new list of dicts with outcome values assigned.
    """
    outcome_values = [ov.key for ov in outcome_group.values]

    if not outcome_values:
        raise ValueError("No outcome values provided for distribution.")

    og_id = outcome_group.id

    n = len(mapping)
    k = len(outcome_values)
    base = n // k
    rem = n % k
    new_mapping = []
    idx = 0
    for i, outcome in enumerate(outcome_values):
        count = base + (1 if i < rem else 0)
        for _ in range(count):
            if idx >= n:
                break
            row = mapping[idx]
            row[og_id] = outcome
            new_mapping.append(row)
            idx += 1
    return new_mapping

dpdict_to_combination_list(dpdict, exclude=[])

Generate all combinations of decision point values as dictionaries. Each combination is a dictionary with decision point IDs as keys and value keys as values.

Source code in src/ssvc/decision_tables/base.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def dpdict_to_combination_list(
    dpdict: dict[str, DecisionPoint],
    exclude: list[str] = [],
) -> list[dict[str, str]]:
    """
    Generate all combinations of decision point values as dictionaries.
    Each combination is a dictionary with decision point IDs as keys and value keys as values.
    """
    dpg_vals = []
    for dp in dpdict.values():
        if dp.id in exclude:
            # skip this decision point if it is in the exclude list
            continue
        vals = []
        for value in dp.values:
            row = {dp.id: value.key}
            vals.append(row)
        dpg_vals.append(vals)

    # now we have a list of lists of dicts, we need to the combinations
    combos = []
    for prod in product(*dpg_vals):
        # prod is a tuple of dicts, we need to merge them
        merged = {}
        for d in prod:
            merged.update(d)
        combos.append(merged)
    return combos

feature_importance(dt)

Calculate feature importance for the decision table. Args: dt:

Returns:

Source code in src/ssvc/decision_tables/base.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def feature_importance(dt: DecisionTable) -> pd.DataFrame:
    """
    Calculate feature importance for the decision table.
    Args:
        dt:

    Returns:

    """
    from ssvc.csv_analyzer import drop_col_feature_importance

    logger.debug("Calculating feature importance for the decision table.")

    df = decision_table_to_shortform_df(dt)
    # target is the last column in the DataFrame, which is the outcome column
    target = _get_target_column_name(df.columns[-1])

    return drop_col_feature_importance(df, target=target)

interpret_feature_importance(dt)

Interpret the feature importance for the decision table. This function is a wrapper around the feature_importance function to provide a more user-friendly output. It sorts the features by importance and adds a commentary column that describes the importance of each feature, calling out the most important features, those above median importance, low to medium importance features, low importance features, and irrelevant features. The commentary is based on the computed feature importance scores.

This function is useful for understanding which decision points and their values are most influential in the decision-making process of the table, and can help in identifying which features can be considered for removal or further investigation.

Parameters:

Name Type Description Default
dt DecisionTable

The decision table to analyze.

required

Returns: pd.DataFrame: A DataFrame containing the feature importance scores.

Source code in src/ssvc/decision_tables/base.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def interpret_feature_importance(dt: DecisionTable) -> pd.DataFrame:
    """
    Interpret the feature importance for the decision table.
    This function is a wrapper around the feature_importance function to provide a more user-friendly output.
    It sorts the features by importance and adds a commentary column that describes the importance of each feature,
    calling out the most important features, those above median importance, low to medium importance features,
    low importance features, and irrelevant features. The commentary is based on the computed feature importance scores.

    This function is useful for understanding which decision points and their values are most influential in the decision-making process of the table,
    and can help in identifying which features can be considered for removal or further investigation.

    Args:
        dt (DecisionTable): The decision table to analyze.
    Returns:
        pd.DataFrame: A DataFrame containing the feature importance scores.
    """

    fi_df = feature_importance(dt)

    logger.debug("Interpreting feature importance for the decision table.")
    col = "feature_importance"
    fi_df = fi_df.sort_values(by=col, ascending=False)
    # add a column for commentary
    # low importance are those with importance< 0.1 * max(importance)
    # irrelevant features are those with importance <= 0
    max_importance = fi_df[col].max()
    logger.debug(f"Max importance: {max_importance}")
    median_importance = fi_df[col].median()
    logger.debug(f"Median importance: {median_importance}")
    low_threshold = 0.1 * fi_df[col].max()
    logger.debug(f"Low threshold: {low_threshold}")
    irrelevant_threshold = 0.0

    def _label_importance(importance: float) -> str:
        """
        Label the importance of a feature based on its importance score.
        The values are computed in relation to the
        Args:
            importance:

        Returns:

        """
        comments = []

        if importance == max_importance:
            comments.append("Most important feature")
        elif importance > median_importance:
            comments.append("Medium-high importance feature")
        elif importance == median_importance:
            comments.append("Median importance feature")
        elif low_threshold <= importance < median_importance:
            comments.append("Low-medium importance feature")
        elif irrelevant_threshold < importance < low_threshold:
            comments.append("Low importance feature")
        elif importance <= irrelevant_threshold:
            comments.append("Irrelevant feature")

        return "; ".join(comments)

    logger.debug("Adding feature importance commentary.")
    fi_df["Commentary"] = fi_df[col].apply(_label_importance)

    return fi_df.reset_index(drop=True)

Provides helper functions for decision tables in SSVC.

dt2df_md(dt, longform=True)

Convert a decision table to a DataFrame. Args: decision_table (DecisionTable): The decision table to convert. longform (bool): Whether to return the longform or shortform DataFrame. Returns: str: A string representation of the DataFrame in CSV format.

Source code in src/ssvc/decision_tables/helpers.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def dt2df_md(
    dt: "DecisionTable",
    longform: bool = True,
) -> str:
    """
    Convert a decision table to a DataFrame.
    Args:
        decision_table (DecisionTable): The decision table to convert.
        longform (bool): Whether to return the longform or shortform DataFrame.
    Returns:
        str: A string representation of the DataFrame in CSV format.
    """
    if longform:
        df = decision_table_to_longform_df(dt)
    else:
        df = decision_table_to_shortform_df(dt)

    df.index.rename("Row", inplace=True)
    return df.to_markdown(index=True)

mapping2mermaid(rows, title=None)

Convert a decision table mapping to a Mermaid graph. Args: rows (list[dict[str:str]]): A list of dictionaries representing the decision table mapping. Each dictionary corresponds to a row in the table, with keys as column names and values as cell values. Each row should have the same keys, representing the columns of the decision table. Returns: str: A string containing a markdown Mermaid graph representation, including the code block markers.

Source code in src/ssvc/decision_tables/helpers.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def mapping2mermaid(rows: list[dict[str:str]], title: str = None) -> str:
    """
    Convert a decision table mapping to a Mermaid graph.
    Args:
        rows (list[dict[str:str]]): A list of dictionaries representing the decision table mapping.
            Each dictionary corresponds to a row in the table, with keys as column names and values as cell values.
            Each row should have the same keys, representing the columns of the decision table.
    Returns:
        str: A string containing a markdown Mermaid graph representation, including the code block markers.
    """
    try:
        return _mapping2mermaid(rows, title=title)
    except ValueError as e:
        # graph is too big, split it into smaller graphs
        # one graph per value in the first column
        first_col = list(rows[0].keys())[0]
        diagrams = []

        # find unique values but keep them in order of appearance
        _uniq_set = set()
        uniq_values = []
        for row in rows:
            if row[first_col] in _uniq_set:
                continue
            _uniq_set.add(row[first_col])
            uniq_values.append(row[first_col])

        for value in uniq_values:
            filtered_rows = [row for row in rows if row[first_col] == value]
            if not filtered_rows:
                continue
            try:
                diagram = _mapping2mermaid(
                    filtered_rows, title=f"{title} - {first_col}:{value}"
                )
                diagrams.append(diagram)
            except ValueError as e:
                logger.error(f"Skipping {title} {value} due to error: {e}")

        return (
            "\n\n".join(diagrams)
            if diagrams
            else "No valid diagrams generated."
        )