Skip to content

Vultron Case States Module

vultron.case_states

The vultron.case_states package implements the CVD Case State Model.

vultron.case_states.hypercube

The vultron.case_states.hypercube module contains the CVDmodel class, which represents the state graph of a Coordinated Vulnerability Disclosure case.

Based on Householder, A. D., and Jonathan Spring. A State-Based Model for Multi-Party Coordinated Vulnerability Disclosure (MPCVD). Tech. Rep. CMU/SEI-2021-SR-021, Software Engineering Institute, Carnegie-Mellon University, Pittsburgh, PA, 2021.

DESIDERATA = (('V', 'P'), ('V', 'X'), ('V', 'A'), ('F', 'P'), ('F', 'X'), ('F', 'A'), ('D', 'P'), ('D', 'X'), ('D', 'A'), ('P', 'X'), ('P', 'A'), ('X', 'A')) module-attribute

Taken directly from the paper: Given (A,B), you prefer histories in which A precedes B over ones in which B precedes A.

CVDmodel

A CVDmodel is a graph of states and transitions between them. The model reflects the CVD process and the actions that can be taken at a high level. It also has a set of histories that can be scored.

Source code in vultron/case_states/hypercube.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
class CVDmodel:
    """
    A CVDmodel is a graph of states and transitions between them.
    The model reflects the CVD process and the actions that can be taken at a high level.
    It also has a set of histories that can be scored.
    """

    _D = DESIDERATA

    def __init__(self):
        # the graph of states
        self.G = None

        # the set of histories
        self.H = None
        self.H_prob = None
        self.H_score = None

        self._rounds_to_cover = None
        self.d_cols = None
        self.w_cols = None
        self.H_df = None
        self.S_df = None
        self.f_d = None
        self.d_to_state_pattern = None
        self.state_pattern_to_d = None
        self.not_d_to_state_pattern = None
        self.state_pattern_to_not_d = None
        self.state_good = None
        self.state_bad = None

        self._setup()

    def _setup(self):
        """
        Setup the model once it has been instantiated
        """
        # create the graph of states
        self.G = _create_graph()

        # walk the graph to collect histories
        self.H = self.histories

        # compute the probabilities of each history
        self.H_prob = self._compute_h_frequencies()

        # build a dataframe of what you have so far
        self.H_df = self._init_H_df()

        # once you have a dataframe you can compute
        # frequencies of individual desiderata weighted
        # by the frequencies of the histories in which
        # they occur
        self.f_d = self._compute_f_d()

        # add a column to our history data for h_scores
        self.H_df = self._compute_tfidf()

        # we need to turn our desiderata into state patterns
        # start with computing the position of each element
        # in our state naming convention
        self.idx = self._construct_index()

        # now turn the desiderata into state patterns
        self.d_to_state_pattern = self._construct_good_patterns()
        # invert the lookup too
        self.state_pattern_to_d = {
            v: k for k, v in self.d_to_state_pattern.items()
        }

        self.not_d_to_state_pattern = self._construct_bad_patterns()
        # invert the lookup too
        self.state_pattern_to_not_d = {
            v: k for k, v in self.not_d_to_state_pattern.items()
        }

        self.state_good = self._build_good()
        self.state_bad = self._build_bad()

        # self.S_score = self._compute_s_scores()
        # construct a dataframe for states
        self.S_df = self._init_sdf()

    @property
    def states(self) -> list:
        """
        Returns the states in the model
        """
        return self.G.nodes

    @ensure_valid_state
    def state_info(self, state: str) -> dict:
        """
        Returns the info for a given state

        Args:
            state: the state to return info for

        Returns:
            a dict of info for the state
        """

        data = {
            "state": state,
            "score": self.score_state(state),
            "vfd": vfd(state),
            "pxa": pxa(state),
            "predecessors": list(self.previous_state(state)),
            "successors": list(self.next_state(state)),
            "explain": explain(state),
            "info": info(state),
            "actions": action(state),
            "zeroday_type": zeroday_type(state),
        }
        return data

    @ensure_valid_state
    def previous_state(self, state: str) -> list:
        """
        For a given state, return the previous state(s)

        Args:
            state: the state to return the previous state(s) for

        Returns:
            a list of previous states
        """
        return self.G.predecessors(state)

    @ensure_valid_state
    def next_state(self, state: str, transition=None):
        if transition is None:
            next_states = list(self.G.successors(state))
            return next_states

        # if you got here transition is not None
        _next = None
        for successor in self.G.successors(state):
            try:
                is_valid_transition(state, successor)
            except TransitionValidationError as e:
                raise CVDmodelError(e)

            edge_data = self.G.get_edge_data(state, successor)
            if edge_data["label"] == transition:
                return successor

    # paths are a list of edges from the graph
    # [(u,v),(v,w),(w,x)...]
    @ensure_valid_state
    def paths_between(
        self, start: str = "vfdpxa", end: str = "VFDPXA"
    ) -> List[tuple]:
        """
        Return all paths of transitions between two states

        Args:
            start: the start state, default="vfdpxa"
            end: the end state, default="VFDPXA"

        Returns:
            a list of paths
        """
        G = self.G
        return nx.all_simple_edge_paths(G, start, end)

    @ensure_valid_state
    def paths_from(self, state: str = "vfdpxa") -> List[tuple]:
        """
        Return all paths of transitions that lead from a given state

        Args:
            state: the state to return paths from, default="vfdpxa"

        Returns:
            a list of paths
        """
        return self.paths_between(start=state, end="VFDPXA")

    @ensure_valid_state
    def paths_to(self, state: str = "VFDPXA") -> List[tuple]:
        """
        Return all paths of transitions that lead to a given state

        Args:
            state: the state to return paths to, default="VFDPXA"

        Returns:
            a list of paths
        """
        return self.paths_between(start="vfdpxa", end=state)

    @property
    def histories(self) -> list:
        """
        A list of all possible case histories traversing the case state space from _vfdpxa_ to _VFDPXA_.
        """
        _H = self.sequences_between(start="vfdpxa", end="VFDPXA")
        H = ["".join(h) for h in _H]
        return H

    # sequences are a list of the labels along edges from the graph
    # ["VPXFDA","XPVAFD"...], ["VAF", "XPV", ], etc.
    def sequences_from(self, state: str = "vfdpxa") -> List[str]:
        """
        Return all sequences of transitions that lead from a given state

        Args:
            state: the state to return sequences from

        Returns:
            a list of sequences
        """
        return self.sequences_between(start=state)

    @ensure_valid_state
    def sequences_to(self, state: str) -> List[str]:
        """
        Return all sequences of transitions that lead to a given state

        Args:
            state: the state to return sequences to

        Returns:
            a list of sequences

        """
        return self.sequences_between(end=state)

    @ensure_valid_state
    def sequences_between(
        self, start: str = "vfdpxa", end: str = "VFDPXA"
    ) -> List[str]:
        """
        Return all sequences of transitions between two states

        Args:
            start: the start state, default="vfdpxa"
            end: the end state, default="VFDPXA"

        Returns:
            a list of sequences
        """
        sequences = []
        for path in self.paths_between(start=start, end=end):
            seq = self.transitions_in_path(path)
            sequences.append(seq)
        return sequences

    def transitions_in_path(self, path: list) -> Tuple[str]:
        """
        Return the transitions in a path

        Args:
            path: a list of graph edges

        Returns:
            a tuple of the edge labels
        """
        G = self.G
        # path is a list of (node1,node2) edge tuples
        data = (G.get_edge_data(u, v, "label") for (u, v) in path)
        labels = (d["label"] for d in data)
        seq = tuple(labels)
        return seq

    @ensure_valid_state
    def walk_from(self, start: str = None, end: str = "VFDPXA") -> tuple:
        """
        Randomly walk from a given state to a given state

        Args:
            start: the start state, default=None
            end: the end state, default="VFDPXA"

        Returns:
            a tuple of the path and the probabilities of each step
        """
        current = start
        path = []
        probabilities = []
        while current != end:
            neighbors = list(self.next_state(current))

            p = 0
            n = len(neighbors)
            if n:
                p = 1 / n

            next_state = random.choice(neighbors)
            step = (current, next_state)
            path.append(step)
            probabilities.append(p)
            current = next_state

        return path, probabilities

    def _compute_h_frequencies(self):
        h_coverage = {}
        # initialize coverage
        for h in self.H:
            h_coverage[h] = None

        count = 0
        while any([v is None for v in h_coverage.values()]):
            path, probs = self.walk_from(start="vfdpxa")
            h = self.transitions_in_path(path)
            h = "".join(h)

            # compute path probability

            p = np.prod(probs)

            h_coverage[h] = p
            count += 1

        self._rounds_to_cover = count
        return h_coverage

    def _compute_tfidf(self) -> pd.DataFrame:
        """
        Compute tf-idf scores for each desiderata in each history

        Returns:
            a dataframe with tf-idf scores for each desiderata in each history

        """
        df = pd.DataFrame(self.H_df)
        tfidf_cols = []

        assert len(self.d_cols) > 0

        for c in self.d_cols:
            wcol = f"w{c}"

            # total weight of the desiderata across all histories
            # weighted by history frequency
            docfreq = sum(df[wcol])

            numerator = 1 + 1
            denominator = docfreq + 1  # avoid div by 0
            idf = np.log(numerator / denominator) + 1

            tfidf_col = f"tfidf_{c}"
            tfidf_cols.append(tfidf_col)

            # df[c] = 1 when d is present in a history, 0 when absent
            df[tfidf_col] = df[c] * idf

        # total tf-idf is the sum of the scores for individual terms
        df["tfidf"] = df[tfidf_cols].sum(axis=1)
        # we don't need the individual tf-idf cols anymore
        df = df.drop(columns=tfidf_cols)

        # assign ranks and sort by rank
        df["rank"] = df["tfidf"].rank(method="dense")
        df = df.sort_values(by="rank").reset_index(drop=True)

        return df

    def _compute_s_scores(self) -> dict:
        """
        Compute the s scores for each state

        Returns:
            a dict of state: s score
        """
        score = {}
        for s in self.states:
            score[s] = self.score_state(s)
        return score

    def _init_H_df(self) -> pd.DataFrame:
        """
        Initialize the history dataframe

        Returns:
            a dataframe of histories and their scores

        Raises:
            CVDmodelError: if the history probabilities are uninitialized
        """
        # make sure we have the info we need before proceeding
        if self.H_prob is None:
            raise CVDmodelError("History probabilities are uninitialized")

        data = []
        d_cols = set()
        w_cols = set()
        for h, p in self.H_prob.items():
            # h is the history string of six events
            # p is the frequency of that history when we random walk the graph
            row = {"h": h, "p": p}

            # walk the desiderata set for this history
            D_h = self._assess_hist(h)

            for d, is_met in D_h.items():
                # d is a tuple of A,B where A<B
                # is_met is true/false, we want it as an int
                is_met = int(is_met)

                # simple unweighted history-vs-desiderata columns
                col = "<".join(d)
                row[col] = is_met
                d_cols.add(col)

                # history-vs-desiderata columns weighted by history likelihood
                col2 = f"w{col}"
                row[col2] = p * is_met
                w_cols.add(col2)

            data.append(row)

        self.d_cols = sorted(list(d_cols))
        self.w_cols = sorted(list(w_cols))

        df = pd.DataFrame(data)
        return df

    def _compute_f_d(self) -> dict:
        """

        Returns:
            a dict of desiderata: frequency
        """
        _f_d = self.H_df[self.w_cols].sum()

        f_d = {}
        for k, v in _f_d.items():
            k = k.replace("w", "")
            a, b = k.split("<")
            new_k = (a, b)
            f_d[new_k] = v

        return f_d

    def _compute_f_d_orig(self):
        f_d = self.H_df[self.d_cols].mean()
        return f_d

    def _assess_hist(self, h) -> dict:
        """
        Assess a history against the desiderata
        Args:
            h: the history to assess

        Returns:
            a dict of desiderata: is_met

        Raises:
            ScoringError: if the history is invalid
        """
        try:
            is_valid_history(h)
        except HistoryValidationError:
            raise ScoringError(f"Invalid history {h}")

        D_h = {(e1, e2): h.index(e1) < h.index(e2) for (e1, e2) in self._D}
        return D_h

    def score_hist(self, h: str) -> float:
        """
        Compute the score for a given history

        Args:
            h: the history to score

        Returns:
            the score for the history
        """
        # this is basically computing the dot product of
        # D_h dot (1-f_d)
        D_h = self._assess_hist(h)

        score = 0
        for k, v in D_h.items():
            if v:
                score += 1 - self.f_d[k]
        return score

    def d_to_col(self, d):
        # d is a tuple (a,b)
        return "<".join(d)

    def _init_sdf(self):
        data = []
        patcols = set()
        for s in self.states:
            row = {
                "state": s,
                "start_embargo": can_start_embargo(s),
                "embargo_viable": embargo_viable(s),
            }
            good, bad = self._assess_state(s)

            for pat, score in good.items():
                # convert pattern into colname
                k = pat.pattern
                d = self.state_pattern_to_d[k]
                col = self.d_to_col(d)
                row[col] = 1 - score
                patcols.add(col)

            for pat, score in bad.items():
                # convert pattern into colname
                k = pat.pattern
                d = self.state_pattern_to_not_d[k]
                col = self.d_to_col(d)
                row[col] = -(1 - score)
                patcols.add(col)

            data.append(row)

        # convert the columns into a list
        patcols = list(patcols)

        df = pd.DataFrame(data).fillna(0)
        df = df.set_index("state")

        # sum across all columns to get net state score
        df["score"] = df[patcols].sum(axis=1)
        df["rank"] = df["score"].rank(method="dense")

        # pagerank returns a dict of state: pr_score so convert to series before adding to df
        pr = self.compute_pagerank()
        prs = pd.Series(pr)
        df["pagerank"] = prs

        return df

    @ensure_valid_state
    def _assess_state(self, state):
        """
        Assess a state against the desiderata
        Args:
            state: the state to assess

        Returns:
            a tuple of (good, bad) patterns
        """
        plus = []
        for p, score in self.state_good.items():
            if p.match(state):
                plus.append((p, score))
        minus = []
        for p, score in self.state_bad.items():
            if p.match(state):
                minus.append((p, score))
        return dict(plus), dict(minus)

    @ensure_valid_state
    def _part_score_state(self, state):
        good, bad = self._assess_state(state)
        plus = sum([1 - g for g in good.values()])
        minus = sum([1 - g for g in bad.values()])

        res = {"plus": plus, "minus": minus}
        return res

    @ensure_valid_state
    def score_state(self, state):
        part = self._part_score_state(state)
        # net = part['plus'] - part['minus']
        net = part["plus"]
        return net

    def _construct_good_patterns(self):
        # construct a state regex pattern to correspond to each desiderata
        idx = self.idx

        g = {}
        for d in self._D:
            # a desiderata is a preference that one event precedes another
            # a and b are an ordered pair of transitions
            (a, b) = d

            # start with a wildcard pattern
            pat = list("......")

            # map a and b to their expected positions in the pattern
            idx1 = idx[a]
            idx2 = idx[b]

            # a precedes b is desired
            # which means we want states where a is capitalized and b is lowercase
            pat[idx1] = a.upper()
            pat[idx2] = b.lower()

            # reassemble the pattern characters into a string
            pat = "".join(pat)
            g[d] = pat

        return g

    def _construct_bad_patterns(self):
        g = self._construct_good_patterns()

        _b = {(b, a): v.swapcase() for (a, b), v in g.items()}
        return _b

    def _build_good(self):
        g = self._construct_good_patterns()
        f_d = self.f_d

        # convert f_d into regex patterns
        _g = {re.compile(g[k]): v for k, v in f_d.items()}

        return _g

    def _build_bad(self):
        g = self._construct_good_patterns()
        f_d = self.f_d

        # bad just inverts the pattern of good
        _b = {re.compile(g[k].swapcase()): (1 - v) for k, v in f_d.items()}

        return _b

    def _construct_index(self):
        # build a simple index of which position vfdpxa appear in the string
        _idx = {c: i for i, c in enumerate("vfdpxa")}
        idx = {}
        for k, v in _idx.items():
            k_upper = k.upper()
            idx[k] = v
            idx[k_upper] = v

        assert len(idx) == 12
        return idx

    def state_adjacency_matrix(self) -> pd.DataFrame:
        """
        Return the state adjacency matrix for the model

        Returns:
            a dataframe of the state adjacency matrix
        """
        G = self.G
        rows = G.nodes()
        cols = rows
        df = pd.DataFrame(
            nx.linalg.adjacency_matrix(G).todense(), index=rows, columns=cols
        )
        return df

    def state_transition_matrix(self) -> pd.DataFrame:
        """
        Return the state transition matrix for the model
        Returns:
            a dataframe of the state transition matrix
        """
        adj_df = self.state_adjacency_matrix()

        df = pd.DataFrame(adj_df)
        # normalize adjacency by the number of next hops
        df = df.div(df.sum(axis=1), axis=0).fillna(0)
        return df

    def find_states(self, pat: str) -> list:
        """
        Find states that match a given pattern
        Args:
            pat: a regex pattern to match

        Returns:
            a list of states that match the pattern

        Raises:
            CVDmodelError: if the pattern is invalid
        """
        try:
            is_valid_pattern(pat)
        except PatternValidationError as e:
            raise CVDmodelError(e)

        matches = []
        for state in self.states:
            if re.match(pat, state):
                matches.append(state)
        return matches

    @ensure_valid_state
    def move_score(self, from_state: str, to_state: str) -> float:
        """
        Compute the score of moving from one state to another
        Args:
            from_state: The state to move from
            to_state: The state to move to

        Returns:
            the score of the move
        """
        try:
            is_valid_transition(from_state, to_state)
        except TransitionValidationError as e:
            return None

        curr_score = self.score_state(from_state)
        next_score = self.score_state(to_state)
        delta = next_score - curr_score
        return delta

    def compute_pagerank(self) -> dict:
        """
        Compute the pagerank of each state in the model.
        Runs the NetworkX pagerank algorithm on the model graph 10000 times.
        Because the model is a directed graph, we need to add a wraparound link so that the pagerank algorithm can walk
        from the end back to the beginning naturally.

        Returns:
            a dict of state: pagerank

        """
        # copy the graph since we're going to modify it
        G = nx.DiGraph(self.G)

        # add a wraparound link
        # this allows page rank to walk from the end back to the beginning naturally
        # and avoids biasing the pagerank algorithm against the early states
        G.add_edge("VFDPXA", "vfdpxa")

        # compute pagerank
        pr = nx.pagerank(G, max_iter=10000)

        # return a dict of node: pr
        return pr

histories: list property

A list of all possible case histories traversing the case state space from vfdpxa to VFDPXA.

states: list property

Returns the states in the model

compute_pagerank()

Compute the pagerank of each state in the model. Runs the NetworkX pagerank algorithm on the model graph 10000 times. Because the model is a directed graph, we need to add a wraparound link so that the pagerank algorithm can walk from the end back to the beginning naturally.

Returns:

Type Description
dict

a dict of state: pagerank

Source code in vultron/case_states/hypercube.py
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def compute_pagerank(self) -> dict:
    """
    Compute the pagerank of each state in the model.
    Runs the NetworkX pagerank algorithm on the model graph 10000 times.
    Because the model is a directed graph, we need to add a wraparound link so that the pagerank algorithm can walk
    from the end back to the beginning naturally.

    Returns:
        a dict of state: pagerank

    """
    # copy the graph since we're going to modify it
    G = nx.DiGraph(self.G)

    # add a wraparound link
    # this allows page rank to walk from the end back to the beginning naturally
    # and avoids biasing the pagerank algorithm against the early states
    G.add_edge("VFDPXA", "vfdpxa")

    # compute pagerank
    pr = nx.pagerank(G, max_iter=10000)

    # return a dict of node: pr
    return pr

find_states(pat)

Find states that match a given pattern Args: pat: a regex pattern to match

Returns:

Type Description
list

a list of states that match the pattern

Raises:

Type Description
CVDmodelError

if the pattern is invalid

Source code in vultron/case_states/hypercube.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
def find_states(self, pat: str) -> list:
    """
    Find states that match a given pattern
    Args:
        pat: a regex pattern to match

    Returns:
        a list of states that match the pattern

    Raises:
        CVDmodelError: if the pattern is invalid
    """
    try:
        is_valid_pattern(pat)
    except PatternValidationError as e:
        raise CVDmodelError(e)

    matches = []
    for state in self.states:
        if re.match(pat, state):
            matches.append(state)
    return matches

move_score(from_state, to_state)

Compute the score of moving from one state to another Args: from_state: The state to move from to_state: The state to move to

Returns:

Type Description
float

the score of the move

Source code in vultron/case_states/hypercube.py
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
@ensure_valid_state
def move_score(self, from_state: str, to_state: str) -> float:
    """
    Compute the score of moving from one state to another
    Args:
        from_state: The state to move from
        to_state: The state to move to

    Returns:
        the score of the move
    """
    try:
        is_valid_transition(from_state, to_state)
    except TransitionValidationError as e:
        return None

    curr_score = self.score_state(from_state)
    next_score = self.score_state(to_state)
    delta = next_score - curr_score
    return delta

paths_between(start='vfdpxa', end='VFDPXA')

Return all paths of transitions between two states

Parameters:

Name Type Description Default
start str

the start state, default="vfdpxa"

'vfdpxa'
end str

the end state, default="VFDPXA"

'VFDPXA'

Returns:

Type Description
List[tuple]

a list of paths

Source code in vultron/case_states/hypercube.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
@ensure_valid_state
def paths_between(
    self, start: str = "vfdpxa", end: str = "VFDPXA"
) -> List[tuple]:
    """
    Return all paths of transitions between two states

    Args:
        start: the start state, default="vfdpxa"
        end: the end state, default="VFDPXA"

    Returns:
        a list of paths
    """
    G = self.G
    return nx.all_simple_edge_paths(G, start, end)

paths_from(state='vfdpxa')

Return all paths of transitions that lead from a given state

Parameters:

Name Type Description Default
state str

the state to return paths from, default="vfdpxa"

'vfdpxa'

Returns:

Type Description
List[tuple]

a list of paths

Source code in vultron/case_states/hypercube.py
303
304
305
306
307
308
309
310
311
312
313
314
@ensure_valid_state
def paths_from(self, state: str = "vfdpxa") -> List[tuple]:
    """
    Return all paths of transitions that lead from a given state

    Args:
        state: the state to return paths from, default="vfdpxa"

    Returns:
        a list of paths
    """
    return self.paths_between(start=state, end="VFDPXA")

paths_to(state='VFDPXA')

Return all paths of transitions that lead to a given state

Parameters:

Name Type Description Default
state str

the state to return paths to, default="VFDPXA"

'VFDPXA'

Returns:

Type Description
List[tuple]

a list of paths

Source code in vultron/case_states/hypercube.py
316
317
318
319
320
321
322
323
324
325
326
327
@ensure_valid_state
def paths_to(self, state: str = "VFDPXA") -> List[tuple]:
    """
    Return all paths of transitions that lead to a given state

    Args:
        state: the state to return paths to, default="VFDPXA"

    Returns:
        a list of paths
    """
    return self.paths_between(start="vfdpxa", end=state)

previous_state(state)

For a given state, return the previous state(s)

Parameters:

Name Type Description Default
state str

the state to return the previous state(s) for

required

Returns:

Type Description
list

a list of previous states

Source code in vultron/case_states/hypercube.py
253
254
255
256
257
258
259
260
261
262
263
264
@ensure_valid_state
def previous_state(self, state: str) -> list:
    """
    For a given state, return the previous state(s)

    Args:
        state: the state to return the previous state(s) for

    Returns:
        a list of previous states
    """
    return self.G.predecessors(state)

score_hist(h)

Compute the score for a given history

Parameters:

Name Type Description Default
h str

the history to score

required

Returns:

Type Description
float

the score for the history

Source code in vultron/case_states/hypercube.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def score_hist(self, h: str) -> float:
    """
    Compute the score for a given history

    Args:
        h: the history to score

    Returns:
        the score for the history
    """
    # this is basically computing the dot product of
    # D_h dot (1-f_d)
    D_h = self._assess_hist(h)

    score = 0
    for k, v in D_h.items():
        if v:
            score += 1 - self.f_d[k]
    return score

sequences_between(start='vfdpxa', end='VFDPXA')

Return all sequences of transitions between two states

Parameters:

Name Type Description Default
start str

the start state, default="vfdpxa"

'vfdpxa'
end str

the end state, default="VFDPXA"

'VFDPXA'

Returns:

Type Description
List[str]

a list of sequences

Source code in vultron/case_states/hypercube.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
@ensure_valid_state
def sequences_between(
    self, start: str = "vfdpxa", end: str = "VFDPXA"
) -> List[str]:
    """
    Return all sequences of transitions between two states

    Args:
        start: the start state, default="vfdpxa"
        end: the end state, default="VFDPXA"

    Returns:
        a list of sequences
    """
    sequences = []
    for path in self.paths_between(start=start, end=end):
        seq = self.transitions_in_path(path)
        sequences.append(seq)
    return sequences

sequences_from(state='vfdpxa')

Return all sequences of transitions that lead from a given state

Parameters:

Name Type Description Default
state str

the state to return sequences from

'vfdpxa'

Returns:

Type Description
List[str]

a list of sequences

Source code in vultron/case_states/hypercube.py
340
341
342
343
344
345
346
347
348
349
350
def sequences_from(self, state: str = "vfdpxa") -> List[str]:
    """
    Return all sequences of transitions that lead from a given state

    Args:
        state: the state to return sequences from

    Returns:
        a list of sequences
    """
    return self.sequences_between(start=state)

sequences_to(state)

Return all sequences of transitions that lead to a given state

Parameters:

Name Type Description Default
state str

the state to return sequences to

required

Returns:

Type Description
List[str]

a list of sequences

Source code in vultron/case_states/hypercube.py
352
353
354
355
356
357
358
359
360
361
362
363
364
@ensure_valid_state
def sequences_to(self, state: str) -> List[str]:
    """
    Return all sequences of transitions that lead to a given state

    Args:
        state: the state to return sequences to

    Returns:
        a list of sequences

    """
    return self.sequences_between(end=state)

state_adjacency_matrix()

Return the state adjacency matrix for the model

Returns:

Type Description
DataFrame

a dataframe of the state adjacency matrix

Source code in vultron/case_states/hypercube.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
def state_adjacency_matrix(self) -> pd.DataFrame:
    """
    Return the state adjacency matrix for the model

    Returns:
        a dataframe of the state adjacency matrix
    """
    G = self.G
    rows = G.nodes()
    cols = rows
    df = pd.DataFrame(
        nx.linalg.adjacency_matrix(G).todense(), index=rows, columns=cols
    )
    return df

state_info(state)

Returns the info for a given state

Parameters:

Name Type Description Default
state str

the state to return info for

required

Returns:

Type Description
dict

a dict of info for the state

Source code in vultron/case_states/hypercube.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@ensure_valid_state
def state_info(self, state: str) -> dict:
    """
    Returns the info for a given state

    Args:
        state: the state to return info for

    Returns:
        a dict of info for the state
    """

    data = {
        "state": state,
        "score": self.score_state(state),
        "vfd": vfd(state),
        "pxa": pxa(state),
        "predecessors": list(self.previous_state(state)),
        "successors": list(self.next_state(state)),
        "explain": explain(state),
        "info": info(state),
        "actions": action(state),
        "zeroday_type": zeroday_type(state),
    }
    return data

state_transition_matrix()

Return the state transition matrix for the model Returns: a dataframe of the state transition matrix

Source code in vultron/case_states/hypercube.py
783
784
785
786
787
788
789
790
791
792
793
794
def state_transition_matrix(self) -> pd.DataFrame:
    """
    Return the state transition matrix for the model
    Returns:
        a dataframe of the state transition matrix
    """
    adj_df = self.state_adjacency_matrix()

    df = pd.DataFrame(adj_df)
    # normalize adjacency by the number of next hops
    df = df.div(df.sum(axis=1), axis=0).fillna(0)
    return df

transitions_in_path(path)

Return the transitions in a path

Parameters:

Name Type Description Default
path list

a list of graph edges

required

Returns:

Type Description
Tuple[str]

a tuple of the edge labels

Source code in vultron/case_states/hypercube.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def transitions_in_path(self, path: list) -> Tuple[str]:
    """
    Return the transitions in a path

    Args:
        path: a list of graph edges

    Returns:
        a tuple of the edge labels
    """
    G = self.G
    # path is a list of (node1,node2) edge tuples
    data = (G.get_edge_data(u, v, "label") for (u, v) in path)
    labels = (d["label"] for d in data)
    seq = tuple(labels)
    return seq

walk_from(start=None, end='VFDPXA')

Randomly walk from a given state to a given state

Parameters:

Name Type Description Default
start str

the start state, default=None

None
end str

the end state, default="VFDPXA"

'VFDPXA'

Returns:

Type Description
tuple

a tuple of the path and the probabilities of each step

Source code in vultron/case_states/hypercube.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
@ensure_valid_state
def walk_from(self, start: str = None, end: str = "VFDPXA") -> tuple:
    """
    Randomly walk from a given state to a given state

    Args:
        start: the start state, default=None
        end: the end state, default="VFDPXA"

    Returns:
        a tuple of the path and the probabilities of each step
    """
    current = start
    path = []
    probabilities = []
    while current != end:
        neighbors = list(self.next_state(current))

        p = 0
        n = len(neighbors)
        if n:
            p = 1 / n

        next_state = random.choice(neighbors)
        step = (current, next_state)
        path.append(step)
        probabilities.append(p)
        current = next_state

    return path, probabilities

vultron.case_states.states

The vultron.case_states.states module implements the CVD Case State Model enums.

It also provides functions for converting between state strings and enums.

AttackObservation

Bases: IntEnum

Represents the attack observation state of a case.

Source code in vultron/case_states/states.py
161
162
163
164
165
166
167
168
169
170
171
172
173
class AttackObservation(IntEnum):
    """
    Represents the attack observation state of a case.
    """

    NO_ATTACKS_OBSERVED = 0
    ATTACKS_OBSERVED = 1

    NO = NO_ATTACKS_OBSERVED
    YES = ATTACKS_OBSERVED

    a = NO_ATTACKS_OBSERVED
    A = ATTACKS_OBSERVED

CS_pxa

Bases: Enum

Represents the public state of a case.

  • pxa indicates the public is unaware, no exploit has been published, and no attacks have been observed.
  • Pxa indicates the public is aware, no exploit has been published, and no attacks have been observed.
  • pxA indicates the public is unaware, no exploit has been published, and attacks have been observed.
  • PxA indicates the public is aware, no exploit has been published, and attacks have been observed.
  • Pxa indicates the public is aware, an exploit has been published, and no attacks have been observed.
  • PXA indicates the public is aware, an exploit has been published, and attacks have been observed.

Note that pXa and pXA are not valid states because once an exploit is published, the public is aware.

Source code in vultron/case_states/states.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class CS_pxa(Enum):
    """Represents the public state of a case.

    - `pxa` indicates the public is unaware, no exploit has been published, and no attacks have been observed.
    - `Pxa` indicates the public is aware, no exploit has been published, and no attacks have been observed.
    - `pxA` indicates the public is unaware, no exploit has been published, and attacks have been observed.
    - `PxA` indicates the public is aware, no exploit has been published, and attacks have been observed.
    - `Pxa` indicates the public is aware, an exploit has been published, and no attacks have been observed.
    - `PXA` indicates the public is aware, an exploit has been published, and attacks have been observed.

    Note that pXa and pXA are not valid states because once an exploit is published, the public is aware.
    """

    # pxa
    pxa = PxaState(
        PublicAwareness.PUBLIC_UNAWARE,
        ExploitPublication.NO_PUBLIC_EXPLOIT,
        AttackObservation.NO_ATTACKS_OBSERVED,
    )
    # Pxa
    Pxa = PxaState(
        PublicAwareness.PUBLIC_AWARE,
        ExploitPublication.NO_PUBLIC_EXPLOIT,
        AttackObservation.NO_ATTACKS_OBSERVED,
    )
    # pxA
    pxA = PxaState(
        PublicAwareness.PUBLIC_UNAWARE,
        ExploitPublication.NO_PUBLIC_EXPLOIT,
        AttackObservation.ATTACKS_OBSERVED,
    )
    # PxA
    PxA = PxaState(
        PublicAwareness.PUBLIC_AWARE,
        ExploitPublication.NO_PUBLIC_EXPLOIT,
        AttackObservation.ATTACKS_OBSERVED,
    )
    # pXa
    pXa = PxaState(
        PublicAwareness.PUBLIC_UNAWARE,
        ExploitPublication.EXPLOIT_PUBLIC,
        AttackObservation.NO_ATTACKS_OBSERVED,
    )
    # PXa
    PXa = PxaState(
        PublicAwareness.PUBLIC_AWARE,
        ExploitPublication.EXPLOIT_PUBLIC,
        AttackObservation.NO_ATTACKS_OBSERVED,
    )
    # pXA
    pXA = PxaState(
        PublicAwareness.PUBLIC_UNAWARE,
        ExploitPublication.EXPLOIT_PUBLIC,
        AttackObservation.ATTACKS_OBSERVED,
    )
    # PXA
    PXA = PxaState(
        PublicAwareness.PUBLIC_AWARE,
        ExploitPublication.EXPLOIT_PUBLIC,
        AttackObservation.ATTACKS_OBSERVED,
    )

CS_vfd

Bases: Enum

Represents the vendor fix path state of a case.

  • vfd indicates the vendor is unaware, no fix is ready and no fix is deployed.
  • Vfd indicates the vendor is aware, no fix is ready and no fix is deployed.
  • VFd indicates the vendor is aware, a fix is ready and no fix is deployed.
  • VFD indicates the vendor is aware, a fix is ready and a fix is deployed.
Source code in vultron/case_states/states.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class CS_vfd(Enum):
    """Represents the vendor fix path state of a case.

    - `vfd` indicates the vendor is unaware, no fix is ready and no fix is deployed.
    - `Vfd` indicates the vendor is aware, no fix is ready and no fix is deployed.
    - `VFd` indicates the vendor is aware, a fix is ready and no fix is deployed.
    - `VFD` indicates the vendor is aware, a fix is ready and a fix is deployed.
    """

    vfd = VfdState(
        VendorAwareness.VENDOR_UNAWARE,
        FixReadiness.FIX_NOT_READY,
        FixDeployment.FIX_NOT_DEPLOYED,
    )
    Vfd = VfdState(
        VendorAwareness.VENDOR_AWARE,
        FixReadiness.FIX_NOT_READY,
        FixDeployment.FIX_NOT_DEPLOYED,
    )
    VFd = VfdState(
        VendorAwareness.VENDOR_AWARE,
        FixReadiness.FIX_READY,
        FixDeployment.FIX_NOT_DEPLOYED,
    )
    VFD = VfdState(
        VendorAwareness.VENDOR_AWARE,
        FixReadiness.FIX_READY,
        FixDeployment.FIX_DEPLOYED,
    )

ExploitPublication

Bases: IntEnum

Represents the exploit publication state of a case.

Source code in vultron/case_states/states.py
146
147
148
149
150
151
152
153
154
155
156
157
158
class ExploitPublication(IntEnum):
    """
    Represents the exploit publication state of a case.
    """

    NO_PUBLIC_EXPLOIT = 0
    EXPLOIT_PUBLIC = 1

    NO = NO_PUBLIC_EXPLOIT
    YES = EXPLOIT_PUBLIC

    x = NO_PUBLIC_EXPLOIT
    X = EXPLOIT_PUBLIC

FixDeployment

Bases: IntEnum

Represents the fix deployment state of a case.

Source code in vultron/case_states/states.py
116
117
118
119
120
121
122
123
124
125
126
127
128
class FixDeployment(IntEnum):
    """
    Represents the fix deployment state of a case.
    """

    FIX_NOT_DEPLOYED = 0
    FIX_DEPLOYED = 1

    NO = FIX_NOT_DEPLOYED
    YES = FIX_DEPLOYED

    d = FIX_NOT_DEPLOYED
    D = FIX_DEPLOYED

FixReadiness

Bases: IntEnum

Represents the fix readiness state of a case.

Source code in vultron/case_states/states.py
101
102
103
104
105
106
107
108
109
110
111
112
113
class FixReadiness(IntEnum):
    """
    Represents the fix readiness state of a case.
    """

    FIX_NOT_READY = 0
    FIX_READY = 1

    NO = FIX_NOT_READY
    YES = FIX_READY

    f = FIX_NOT_READY
    F = FIX_READY

PublicAwareness

Bases: IntEnum

Represents the public awareness state of a case.

Source code in vultron/case_states/states.py
131
132
133
134
135
136
137
138
139
140
141
142
143
class PublicAwareness(IntEnum):
    """
    Represents the public awareness state of a case.
    """

    PUBLIC_UNAWARE = 0
    PUBLIC_AWARE = 1

    NO = PUBLIC_UNAWARE
    YES = PUBLIC_AWARE

    p = PUBLIC_UNAWARE
    P = PUBLIC_AWARE

PxaState

Bases: NamedTuple

Represents the public exploit path state of a case.

Source code in vultron/case_states/states.py
197
198
199
200
201
202
class PxaState(NamedTuple):
    """Represents the public exploit path state of a case."""

    public_awareness: PublicAwareness
    exploit_publication: ExploitPublication
    attack_observation: AttackObservation

State

Bases: NamedTuple

Represents the state of a case.

Source code in vultron/case_states/states.py
178
179
180
181
182
183
184
185
186
class State(NamedTuple):
    """Represents the state of a case."""

    vendor_awareness: VendorAwareness
    fix_readiness: FixReadiness
    fix_deployment: FixDeployment
    public_awareness: PublicAwareness
    exploit_publication: ExploitPublication
    attack_observation: AttackObservation

VendorAwareness

Bases: IntEnum

Represents the vendor awareness state of a case.

Source code in vultron/case_states/states.py
86
87
88
89
90
91
92
93
94
95
96
97
98
class VendorAwareness(IntEnum):
    """
    Represents the vendor awareness state of a case.
    """

    VENDOR_UNAWARE = 0
    VENDOR_AWARE = 1

    NO = VENDOR_UNAWARE
    YES = VENDOR_AWARE

    v = VENDOR_UNAWARE
    V = VENDOR_AWARE

VfdState

Bases: NamedTuple

Represents the vendor fix path state of a case.

Source code in vultron/case_states/states.py
189
190
191
192
193
194
class VfdState(NamedTuple):
    """Represents the vendor fix path state of a case."""

    vendor_awareness: VendorAwareness
    fix_readiness: FixReadiness
    fix_deployment: FixDeployment

state_string_to_enum2(s)

Convert a state string to a list of enums that define the state

Example

state_string_to_enum2('vfdpxa')
returns
( VendorAwareness.VENDOR_UNAWARE,
  FixReadiness.FIX_NOT_READY,
  FixDeployment.FIX_NOT_DEPLOYED,
  PublicAwareness.PUBLIC_UNAWARE,
  ExploitPublication.NO_PUBLIC_EXPLOIT,
  AttackObservation.NO_ATTACKS_OBSERVED)

Parameters:

Name Type Description Default
s str

the state string

required

Returns:

Type Description
Tuple[Enum]

a list of Enums

Source code in vultron/case_states/states.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@ensure_valid_state
def state_string_to_enum2(s: str) -> Tuple[Enum]:
    """
    Convert a state string to a list of enums that define the state

    Example:
        ```python
        state_string_to_enum2('vfdpxa')
        ```
        returns
        ```python
        ( VendorAwareness.VENDOR_UNAWARE,
          FixReadiness.FIX_NOT_READY,
          FixDeployment.FIX_NOT_DEPLOYED,
          PublicAwareness.PUBLIC_UNAWARE,
          ExploitPublication.NO_PUBLIC_EXPLOIT,
          AttackObservation.NO_ATTACKS_OBSERVED)
        ```

    Args:
        s: the state string

    Returns:
        a list of Enums
    """
    enums = [
        VendorAwareness,
        FixReadiness,
        FixDeployment,
        PublicAwareness,
        ExploitPublication,
        AttackObservation,
    ]

    resolved_enums = []
    for value, enum in zip(s, enums):
        resolved_enums.append(enum[value])

    return tuple(resolved_enums)

state_string_to_enums(s)

Convert a state string to a tuple of enums that define the state (CS_vfd, CS_pxa)

Parameters:

Name Type Description Default
s str

the state string

required

Returns:

Type Description
Tuple[Enum]

a tuple of enums

Source code in vultron/case_states/states.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@ensure_valid_state
def state_string_to_enums(s: str) -> Tuple[Enum]:
    """
    Convert a state string to a tuple of enums that define the state `(CS_vfd, CS_pxa)`

    Args:
        s: the state string

    Returns:
        a tuple of enums

    """
    (s1, s2) = (s[:3], s[3:])
    vfd = CS_vfd[s1]
    pxa = CS_pxa[s2]
    return (vfd, pxa)

vultron.case_states.validations

This module contains functions to validate the various strings and patterns used by the CVD State Model

ensure_valid_history(func)

Decorator to ensure a valid history is passed to a function

Example

@ensure_valid_history
def my_func(history, ...):
    ...

Args: func: the function to decorate

Returns:

Type Description
Callable

the decorated function

Raises:

Type Description
HistoryValidationError

if the history is invalid

Source code in vultron/case_states/validations.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def ensure_valid_history(func: Callable) -> Callable:
    """Decorator to ensure a valid history is passed to a function

    Example:
        ```python
        @ensure_valid_history
        def my_func(history, ...):
            ...
        ```
    Args:
        func: the function to decorate

    Returns:
        the decorated function

    Raises:
        HistoryValidationError: if the history is invalid
    """

    def wrapper(*args, **kwargs):
        # get the history from the first arg
        history = args[0]
        try:
            is_valid_history(history)
        except HistoryValidationError as e:
            raise e
        return func(*args, **kwargs)

    return wrapper

ensure_valid_pattern(func)

Function decorator to ensure a valid pattern is passed to a function

Example

@ensure_valid_pattern
def my_func(pattern, ...):
     ...

Args: func: the function to decorate

Returns:

Type Description
Callable

the decorated function

Source code in vultron/case_states/validations.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def ensure_valid_pattern(func: Callable) -> Callable:
    """Function decorator to ensure a valid pattern is passed to a function

    Example:
        ```python
        @ensure_valid_pattern
        def my_func(pattern, ...):
             ...
        ```
    Args:
        func: the function to decorate

    Returns:
        the decorated function
    """

    def wrapper(*args, **kwargs):
        # get the pattern from the first arg
        pat = args[0]
        try:
            is_valid_pattern(pat)
        except PatternValidationError as e:
            raise e
        return func(*args, **kwargs)

    return wrapper

ensure_valid_state(func)

Function Decorator to ensure a valid state is passed to a function

Example

@ensure_valid_state
def my_func(state, ...):
    ...

Parameters:

Name Type Description Default
func Callable

the function to decorate

required

Returns:

Type Description
Callable

the decorated function

Source code in vultron/case_states/validations.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def ensure_valid_state(func: Callable) -> Callable:
    """Function Decorator to ensure a valid state is passed to a function

    Example:
        ```python
        @ensure_valid_state
        def my_func(state, ...):
            ...
        ```

    Args:
        func: the function to decorate

    Returns:
        the decorated function
    """

    def wrapper(*args, **kwargs):
        # get the state from the first arg
        state = args[0]
        try:
            is_valid_state(state)
        except StateValidationError as e:
            raise e
        return func(*args, **kwargs)

    return wrapper

ensure_valid_state_method_wrapper(func)

Method Decorator to ensure a valid state is passed to a method. Equivalent to ensure_valid_state, but for methods.

Example

class MyClass:
    @ensure_valid_state_method_wrapper
    def my_method(self, state, ...):
        ...

Parameters:

Name Type Description Default
func Callable

the method to decorate

required

Returns:

Type Description
Callable

the decorated method

Source code in vultron/case_states/validations.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def ensure_valid_state_method_wrapper(func: Callable) -> Callable:
    """Method Decorator to ensure a valid state is passed to a method.
    Equivalent to ensure_valid_state, but for methods.

    Example:
        ```python
        class MyClass:
            @ensure_valid_state_method_wrapper
            def my_method(self, state, ...):
                ...
        ```

    Args:
        func: the method to decorate

    Returns:
        the decorated method
    """

    def wrapper(self, *args, **kwargs):
        # todo: this part would be cool to use, but it slows things down (doubled the time to run tests)
        # # get method default arguments and add them to the dict we are going to check
        # sig = inspect.signature(func)
        # bound = sig.bind(self, *args, **kwargs)
        # bound.apply_defaults()
        # bound_args = bound.arguments
        # bound_args.update(kwargs)

        # sometimes it's a kwarg called "state"
        # sometimes there are two states called "src" and "dst" or "start" and "end"
        states = []
        for k in ["state", "src", "dst", "start", "end"]:
            # if k in bound_args:
            #     states.append(bound_args[k])
            if k in kwargs:
                states.append(kwargs[k])

        # sometimes the state is the first arg,
        if len(states) == 0 and len(args) > 0:
            states.append(args[0])

        if not len(states) > 0:
            raise RuntimeError(
                "ensure_valid_state_method_wrapper: no state found in args or kwargs"
            )

        for state in states:
            try:
                is_valid_state(state)
            except StateValidationError as e:
                raise e

        return func(self, *args, **kwargs)

    return wrapper

is_valid_history(h)

Validate a history string. Checks that the history is exactly six characters long, is all uppercase, and contains one each of V, F, D, P, X, and A. Also checks that the causally-related events are in the correct order:

  • V \(\prec\) F \(\prec\) D
  • P \(\prec\) X or XP
  • V \(\prec\) P or PV

Example

is_valid_history("VFDPXA")
succeeds, but
is_valid_history("VFDPX")
fails with a HistoryValidationError.

Parameters:

Name Type Description Default
h str

the history string to validate

required

Returns:

Type Description
None

None

Raises:

Type Description
HistoryValidationError

if the history is invalid

Source code in vultron/case_states/validations.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def is_valid_history(h: str) -> None:
    """
    Validate a history string.
    Checks that the history is exactly six characters long, is all uppercase, and contains one each of V, F, D, P, X, and A.
    Also checks that the causally-related events are in the correct order:

    - V $\\prec$ F $\\prec$ D
    - P $\\prec$ X or XP
    - V $\\prec$ P or PV

    Example:
        ```python
        is_valid_history("VFDPXA")
        ```
        succeeds, but
        ```python
        is_valid_history("VFDPX")
        ```
        fails with a `HistoryValidationError`.

    Args:
        h: the history string to validate

    Returns:
        None

    Raises:
        HistoryValidationError: if the history is invalid
    """
    # a history has exactly six elements
    if len(h) != 6:
        raise HistoryValidationError("History must have 6 events")

    # a history is all uppercase
    if not h.isupper():
        raise HistoryValidationError("History must be all uppercase")

    # a history must contain one each of v,f,d,p,x,a
    for c in "VFDPXA":
        if c not in h:
            raise HistoryValidationError(f"History must contain event {c}")

    # V<F
    if h.index("V") > h.index("F"):
        raise HistoryValidationError("V must precede F")
    # F<D
    if h.index("F") > h.index("D"):
        raise HistoryValidationError("F must precede D")
    # P...X or XP
    if (h.index("P") - h.index("X")) > 1:
        raise HistoryValidationError(
            "P must precede X or immediately follow it"
        )
    # V...P or PV
    if (h.index("V") - h.index("P")) > 1:
        raise HistoryValidationError(
            "V must precede V or immediately follow it"
        )

is_valid_pattern(pat)

Validate a pattern string. Patterns are expected to be regular expressions with the following constraints:

  • The pattern is exactly six characters long.
  • The pattern contains either upper or lower case v, f, d, p, x, a, in that order, or a dot.

Examples:

is_valid_pattern("vfdpxa")
is_valid_pattern("Vfd.X.")
will succeed but
is_valid_pattern("xpdfva")
is_valid_pattern("vfDPx")
fail with a PatternValidationError.

Parameters:

Name Type Description Default
pat str

the pattern string to validate

required

Returns:

Type Description
None

None

Raises:

Type Description
PatternValidationError

if the pattern is invalid

Source code in vultron/case_states/validations.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def is_valid_pattern(pat: str) -> None:
    """
    Validate a pattern string.
    Patterns are expected to be regular expressions with the following constraints:

    - The pattern is exactly six characters long.
    - The pattern contains either upper or lower case v, f, d, p, x, a, in that order, or a dot.

    Examples:
        ```python
        is_valid_pattern("vfdpxa")
        is_valid_pattern("Vfd.X.")
        ```
        will succeed but
        ```python
        is_valid_pattern("xpdfva")
        is_valid_pattern("vfDPx")
        ```
        fail with a `PatternValidationError`.

    Args:
        pat: the pattern string to validate

    Returns:
        None

    Raises:
        PatternValidationError: if the pattern is invalid
    """
    if pat is None:
        raise (PatternValidationError(f"Invalid Pattern [{pat}]"))

    if not len(pat) == 6:
        raise PatternValidationError(f"Invalid Pattern [{pat}]")

    for p, c in zip(pat.lower(), "vfdpxa"):
        if p == c:
            continue
        # if you got here, the chars don't match
        if p == ".":
            continue
        # and the pattern is not a dot at this char
        # so you have a problem
        raise PatternValidationError(f"Invalid Pattern [{pat}]")

is_valid_state(state)

Validate a state string. Checks that the state is a valid state pattern. Also checks that the state is not impossible (e.g. vF...., .fD...) Finally, verifies that only allowed symbols (vVfFdDpPxXaA.) are used.

Parameters:

Name Type Description Default
state str

the state string to validate

required

Returns:

Type Description
None

None

Raises:

Type Description
StateValidationError

if the state is invalid

Source code in vultron/case_states/validations.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def is_valid_state(state: str) -> None:
    """Validate a state string.
    Checks that the state is a valid state pattern.
    Also checks that the state is not impossible (e.g. `vF....`, `.fD...`)
    Finally, verifies that only allowed symbols (`vVfFdDpPxXaA.`) are used.


    Args:
        state: the state string to validate

    Returns:
        None

    Raises:
        StateValidationError: if the state is invalid
    """
    # every valid state has to be a valid pattern too
    try:
        is_valid_pattern(state)
    except PatternValidationError as e:
        raise StateValidationError(e)

    # disqualify impossible states
    if re.match("vF....", state):
        raise StateValidationError(f"Invalid state [{state}]")
    if re.match(".fD...", state):
        raise StateValidationError(f"Invalid state [{state}]")
    if not re.match("[vV.][fF.][dD.][pP.][xX.][aA.]", state):
        raise StateValidationError(f"Invalid state [{state}]")

is_valid_transition(src, dst, allow_null=False)

Validate a transition from src to dst

Parameters:

Name Type Description Default
src str

the source state

required
dst str

the destination state

required
allow_null bool

if True, allow null transitions (src==dst), default=False

False

Returns:

Type Description
None

None

Raises:

Type Description
TransitionValidationError

if the transition is invalid

Source code in vultron/case_states/validations.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def is_valid_transition(src: str, dst: str, allow_null: bool = False) -> None:
    """Validate a transition from src to dst

    Args:
        src: the source state
        dst: the destination state
        allow_null: if True, allow null transitions (src==dst), default=False

    Returns:
        None

    Raises:
        TransitionValidationError: if the transition is invalid
    """
    try:
        is_valid_state(src)
    except StateValidationError as e:
        raise TransitionValidationError(e)

    try:
        is_valid_state(dst)
    except StateValidationError as e:
        raise TransitionValidationError(e)

    # compute hamming distance
    diff = [(c1, c2) for c1, c2 in zip(src, dst) if c1 != c2]
    HD = len(diff)

    # short circuit to allow null transition
    if allow_null and HD == 0:
        return

    # otherwise reject unless hamming distance is 1
    if HD != 1:
        raise TransitionValidationError("Only HD=1 transitions allowed")

    # changes must be from lc to uc
    c1, c2 = diff[0]
    if c1.isupper():
        raise TransitionValidationError("Transitions from UC not permitted")

    if c2.islower():
        raise TransitionValidationError("Transitions to lc not permitted")

    if c1.lower() != c2.lower():
        raise TransitionValidationError(f"Invalid transition [{c1}->{c2}]")

    for a, b in TRANSITION_RULES:
        if re.match(a, src):
            # if the first pattern matches,
            # then the second must as well
            if not re.match(b, dst):
                raise TransitionValidationError(
                    f"Transition not permitted [{a}->{b}]"
                )

vultron.case_states.errors

vultron.case_states.errors provides error classes for the CVD State Model.

CvdStateModelError

Bases: VultronError

Base class for errors in the vultron.case_states module.

Source code in vultron/case_states/errors.py
21
22
class CvdStateModelError(VultronError):
    """Base class for errors in the `vultron.case_states` module."""