Skip to content

Utilities

This page documents the utility functions provided by Skarv.

Periodic Execution

skarv.utilities.call_every(seconds: float, wait_first: bool = False)

Decorator to repeatedly call a function every specified number of seconds.

Parameters:

Name Type Description Default
seconds float

The interval in seconds between calls.

required
wait_first bool

If True, wait for the interval before the first call. Defaults to False.

False

Returns:

Name Type Description
Callable

A decorator that schedules the function to be called periodically.

Source code in skarv/utilities/__init__.py
def call_every(
    seconds: float,
    wait_first: bool = False,
):
    """Decorator to repeatedly call a function every specified number of seconds.

    Args:
        seconds (float): The interval in seconds between calls.
        wait_first (bool, optional): If True, wait for the interval before the first call. Defaults to False.

    Returns:
        Callable: A decorator that schedules the function to be called periodically.
    """

    def timed_task_decorator(func: Callable) -> Callable:

        is_coroutine = asyncio.iscoroutinefunction(func)

        async def timer():

            if wait_first:
                await asyncio.sleep(seconds)

            while True:
                t_0 = time.time()

                try:
                    if is_coroutine:
                        await func()
                    else:
                        await asyncio.get_event_loop().run_in_executor(None, func)
                except Exception:  # pylint: disable=broad-except
                    logger.exception(f"call_every: Exception in {func}")

                remainder = seconds - (time.time() - t_0)

                if remainder < 0:
                    warnings.warn(
                        f"Function {func} has an execution time the exceeds"
                        f" the requested execution interval of {seconds}s!",
                        UserWarning,
                    )

                await asyncio.sleep(max(remainder, 0))

        # Put `timer` on the event loop on service startup
        schedule_coroutine(timer())
        return func

    return timed_task_decorator

Zenoh Integration

skarv.utilities.zenoh.mirror(zenoh_session: zenoh.Session, zenoh_key: str, skarv_key: str)

Mirror a Zenoh key expression to a Skarv key.

Subscribes to the Zenoh key expression and automatically puts received values into Skarv. If the Zenoh key already has a value, it fetches and stores it in Skarv (if not already present).

Parameters:

Name Type Description Default
zenoh_session Session

The Zenoh session to use.

required
zenoh_key str

The Zenoh key expression to subscribe to.

required
skarv_key str

The Skarv key to store values in.

required
Source code in skarv/utilities/zenoh.py
def mirror(zenoh_session: zenoh.Session, zenoh_key: str, skarv_key: str):
    """Mirror a Zenoh key expression to a Skarv key.

    Subscribes to the Zenoh key expression and automatically puts received values into Skarv.
    If the Zenoh key already has a value, it fetches and stores it in Skarv (if not already present).

    Args:
        zenoh_session (zenoh.Session): The Zenoh session to use.
        zenoh_key (str): The Zenoh key expression to subscribe to.
        skarv_key (str): The Skarv key to store values in.
    """
    # Subscribe to the key expression and put the received value into skarv
    zenoh_session.declare_subscriber(
        zenoh_key, lambda sample: put(skarv_key, sample.payload)
    )

    # If the key expression already has a value, we fetch it and put it into skarv
    for response in zenoh_session.get(zenoh_key):
        if (ok_response := response.ok) and not get(skarv_key):
            put(skarv_key, ok_response.payload)