smonitor.policy package#

Submodules#

smonitor.policy.engine module#

class smonitor.policy.engine.PolicyEngine[source]#

Bases: object

Policy engine for routing/filtering/transforms.

apply(event, handlers)[source]#
Parameters:
  • event (Dict[str, Any])

  • handlers (Iterable[Any])

Return type:

Tuple[Dict[str, Any], List[Any]]

get_filters()[source]#
Return type:

List[Dict[str, Any]]

get_routes()[source]#
Return type:

List[Dict[str, Any]]

set_filters(filters)[source]#
Parameters:

filters (List[Dict[str, Any]])

Return type:

None

set_routes(routes)[source]#
Parameters:

routes (List[Dict[str, Any]])

Return type:

None

class smonitor.policy.engine.PolicyState(counters: 'Dict[str, int]'=<factory>, last_seen: 'Dict[str, float]'=<factory>)[source]#

Bases: object

Parameters:
  • counters (Dict[str, int])

  • last_seen (Dict[str, float])

counters: Dict[str, int]#
last_seen: Dict[str, float]#
smonitor.policy.engine.random() x in the interval [0, 1).#

Module contents#