Skip to content

Core API

This page documents the core functions and classes of the Skarv package.

Classes

skarv.Sample dataclass

A data sample consisting of a key expression and its associated value.

Attributes:

Name Type Description
key_expr KeyExpr

The key expression associated with the sample.

value Any

The value of the sample.

Source code in skarv/__init__.py
@dataclass(frozen=True)
class Sample:
    """A data sample consisting of a key expression and its associated value.

    Attributes:
        key_expr (KeyExpr): The key expression associated with the sample.
        value (Any): The value of the sample.
    """

    key_expr: KeyExpr
    value: Any

skarv.Subscriber dataclass

A subscriber that listens to updates for a specific key expression.

Attributes:

Name Type Description
key_expr KeyExpr

The key expression to subscribe to.

callback Callable[[Any], None]

The callback function to invoke when a matching sample is published.

Source code in skarv/__init__.py
@dataclass(frozen=True)
class Subscriber:
    """A subscriber that listens to updates for a specific key expression.

    Attributes:
        key_expr (KeyExpr): The key expression to subscribe to.
        callback (Callable[[Any], None]): The callback function to invoke when a matching sample is published.
    """

    key_expr: KeyExpr
    callback: Callable[[Any], None]

skarv.Middleware dataclass

A middleware operator that processes values for a specific key expression.

Attributes:

Name Type Description
key_expr KeyExpr

The key expression the middleware applies to.

operator Callable[[Any], Any]

The operator function to process the value.

Source code in skarv/__init__.py
@dataclass(frozen=True)
class Middleware:
    """A middleware operator that processes values for a specific key expression.

    Attributes:
        key_expr (KeyExpr): The key expression the middleware applies to.
        operator (Callable[[Any], Any]): The operator function to process the value.
    """

    key_expr: KeyExpr
    operator: Callable[[Any], Any]

Functions

skarv.put(key: str, value: Any)

Store a value for a given key, passing it through any registered middlewares and notifying subscribers.

Parameters:

Name Type Description Default
key str

The key to associate with the value.

required
value Any

The value to store.

required
Source code in skarv/__init__.py
def put(key: str, value: Any):
    """Store a value for a given key, passing it through any registered middlewares and notifying subscribers.

    Args:
        key (str): The key to associate with the value.
        value (Any): The value to store.
    """
    ke: KeyExpr = KeyExpr.autocanonize(key)

    # Pass through middlewares
    for middleware in _find_matching_middlewares(key):
        value = middleware.operator(value)

        if value is None:
            return

    # Add final value to vault
    with _vault_lock:
        _vault[ke] = value

    # Trigger subscribers
    sample = Sample(ke, value)
    for subscriber in _find_matching_subscribers(key):
        subscriber.callback(sample)

skarv.subscribe(*keys: str)

Decorator to subscribe a callback to one or more keys.

Parameters:

Name Type Description Default
*keys str

One or more keys to subscribe to.

()

Returns:

Name Type Description
Callable

A decorator that registers the callback as a subscriber.

Source code in skarv/__init__.py
def subscribe(*keys: str):
    """Decorator to subscribe a callback to one or more keys.

    Args:
        *keys (str): One or more keys to subscribe to.

    Returns:
        Callable: A decorator that registers the callback as a subscriber.
    """
    logger.debug("Subscribing to: %s", keys)

    # Adding a new subscriber means we need to clear the cache
    _find_matching_subscribers.cache_clear()
    logger.debug("Cleared subscriber cache.")

    def decorator(callback: Callable):
        for key in keys:
            ke = KeyExpr.autocanonize(key)
            logger.debug("Adding internal Subscriber for %s", ke)
            _subscribers.add(Subscriber(ke, callback))

        return callback

    return decorator

skarv.get(key: str) -> List[Sample]

Retrieve all samples whose keys intersect with the given key.

Parameters:

Name Type Description Default
key str

The key to search for.

required

Returns:

Type Description
List[Sample]

List[Sample]: A list of matching samples.

Source code in skarv/__init__.py
def get(key: str) -> List[Sample]:
    """Retrieve all samples whose keys intersect with the given key.

    Args:
        key (str): The key to search for.

    Returns:
        List[Sample]: A list of matching samples.
    """
    logger.debug("Getting for %s", key)
    req_ke = KeyExpr.autocanonize(key)

    with _vault_lock:
        samples = [
            Sample(rep_ke, value)
            for rep_ke, value in _vault.items()
            if req_ke.intersects(rep_ke)
        ]

    return samples

skarv.register_middleware(key: str, operator: Callable[[Any], Any])

Register a middleware operator for a given key.

Parameters:

Name Type Description Default
key str

The key to associate with the middleware.

required
operator Callable[[Any], Any]

The operator function to process values.

required
Source code in skarv/__init__.py
def register_middleware(key: str, operator: Callable[[Any], Any]):
    """Register a middleware operator for a given key.

    Args:
        key (str): The key to associate with the middleware.
        operator (Callable[[Any], Any]): The operator function to process values.
    """
    logger.debug("Registering middleware on %s", key)
    ke = KeyExpr.autocanonize(key)
    _middlewares.add(Middleware(ke, operator))
    _find_matching_middlewares.cache_clear()