Python decorator usage

# Example 1: logging.
import datetime

def log(func):
    def wrapper(*args, **kwargs):
        with open("logs.txt", "a") as f:
            f.write("Called func with " + " ".join([str(arg) for arg in args]) + " at " + str(datetime.datetime.now()) + "\n")
        val = func(*args, **kwargs)
        return val
    return wrapper

# Same as writing run = log(run)
@log
def run(a, b, c=9):
    print(a + b + c)

run(1, 3)

# Example 2: class singleton instance.
def singleton(cls):
	instances = {}
	def getinstance():
		if cls not in instances:
            instances[cls] = cls()
        return instances[cls]
    return getinstance

@singleton
class MyClass:
    ...
# Example 3: add attributes to a function.
def attrs(**kwds):
    def decorate(f):
        for k in kwds:
            setattr(f, k, kwds[k])
        return f
    return decorate

@attrs(versionadded="2.2",
       author="Guido van Rossum")
def mymethod(f):
    ...

If the attribute is not found, setattr() creates a new attribute and assigns value to it. However, this is only possible if the object implements the __dict__() method.

# Example 4: enforce function argument and return types.
def accepts(*types):
    def check_accepts(f):
        assert len(types) == f.func_code.co_argcount
        def new_f(*args, **kwds):
            for (a, t) in zip(args, types):
                assert isinstance(a, t), \
                       "arg %r does not match %s" % (a,t)
            return f(*args, **kwds)
        new_f.func_name = f.func_name
        return new_f
    return check_accepts

def returns(rtype):
    def check_returns(f):
        def new_f(*args, **kwds):
            result = f(*args, **kwds)
            assert isinstance(result, rtype), \
                   "return value %r does not match %s" % (result,rtype)
            return result
        new_f.func_name = f.func_name
        return new_f
    return check_returns

@accepts(int, (int,float))
@returns((int,float))
def func(arg1, arg2):
    return arg1 * arg2
# Example 5: Declare that a class implements a particular (set of) interface(s).
def provides(*interfaces):
     """
     An actual, working, implementation of provides for
     the current implementation of PyProtocols.  Not
     particularly important for the PEP text.
     """
     def provides(typ):
         declareImplementation(typ, instancesProvide=interfaces)
         return typ
     return provides

class IBar(Interface):
     """Declare something about IBar here"""

@provides(IBar)
class Foo(object):
        """Implement something here..."""

Commen decorators in python:

  • @property
  • @classmethod
  • @staticmethod
class Mass:
    def __init__(self, kilos):
        self.kilos = kilos
        
    @property
    def pounds(self):
        return self.kilos * 2.205

    @classmethod
    def from_pounds(cls, pounds):
        # convert pounds to kilos
        kilos = pounds / 2.205
        # cls is the same as Weight. calling cls(kilos) is the same as Weight(kilos)
        return cls(kilos)

    @staticmethod
    def conversion_info():
        print("Kilos are converted to pounds by multiplying by 2.205.")

A static method is tied to the class, not to its instance. This may remind you of a class method but the key difference is that a static method doesn’t modify the class at all. In other words, a static method doesn’t take self or cls as its arguments.

Type annotation and protocol usage

# Example 1: protocol
from typing import Protocol

class HasBirthYear(Protocol):
	# use ellipsis (...) as the function body.
    def get_birthyear(self) -> int: ...

class Person:
    def __init__(self, name, birthyear):
        self.name = name
        self.birthyear = birthyear

    def get_birthyear(self) -> int:
        return self.birthyear

def calc_age(current_year: int, data: HasBirthYear) -> int:
    return current_year - data.get_birthyear()

john = Person("john doe", 1996)
# Example 2: Add type hints for Iterable class.
# Iterable type that implements the __iter__ method.
from collections.abc import Iterable

def double_elements(items: Iterable[int]) -> list[int]:
    return [item * 2 for item in items]

print(double_elements([2, 4, 6])) # list
print(double_elements((2, 4)))     # tuple
# Example 3: Add type hints for Sequence class.
# Sequence type that have special methods: __getitem__ and __len__.
from collections.abc import Sequence

def get_last_element(data: Sequence[int]) -> int:
    return data[-1]

first_item = get_last_element((3, 4, 5))    # 5
second_item = get_last_element([3, 8]    # 8
# Example 4: Add type hints for Mapping class.
# Mapping type that implements the following methods:
#   __getitem__: for accessing an element
#   __iter__: for iterating
#   __len__: computing the length
from collections.abc import Mapping

def get_full_name(student: Mapping[str, str]) -> str:
    return f'{student.get("first_name")} {student.get("last_name")}'

john = {
  "first_name": "John",
  "last_name": "Doe",
}

get_full_name(john)
# Example 5: Add type hints for MutableMapping class.
# Mapping type that implements the following methods:
#   __getitem__: for accessing an element
#   __setitem__: for setting an element
#   __delitem__: for deleting an element
#   __iter__: for iterating
#   __len__: computing the length
from collections.abc import MutableMapping

def update_first_name(student: MutableMapping[str, str], first_name: str) -> None:
    student["first_name"] = first_name

john = {
    "first_name": "John",
    "last_name": "Doe",
}

update_first_name(john, "james")
# Example 6: Add type hints to tuples
# Annotate a tuple with two elements
student: tuple[str, int] = ("John Doe", 18)
# Annotate a tuple with an unknown amount of elements of a similar type
letters: tuple[str, ...] = ('a', 'h', 'j', 'n', 'm', 'n', 'z')

# Annotate a tuple with a named type
from typing import NamedTuple

class StudentTuple(NamedTuple):
    name: str
    age: int

john = StudentTuple("John Doe", 33)
# Example 7: Add type hints to TypedDict.
from typing import TypedDict

class StudentDict(TypedDict):
    first_name: str
    last_name: str
    age: int
    hobbies: list[str]

student1: StudentDict = {
    "first_name": "John",
    "last_name": "Doe",
    "age": 18,
    "hobbies": ["singing", "dancing"],
}
# Example 8: Add type hints for a union type.
def show_type(num: str | int):
    if(isinstance(num, str)):
        print("You entered a string")
    elif (isinstance(num, int)):
        print("You entered an integer")

show_type('hello') # You entered a string
show_type(3)       # You entered an integer
# Example 9: overloaded function
from typing import overload

# Decorator: @overload
@overload
def add_number(value: int, num: int) -> int: ...

@overload
def add_number(value: list, num: int) -> list: ...

def add_number(value, num):
    if isinstance(value, int):
        return value + num
    elif isinstance(value, list):
        return [i + num for i in value]

print(add_number(3, 4))
print(add_number([1, 2, 5], 4)
# Example 10: Add type hints for optional parameters
def format_name(name: str, title: Optional[str] = None) -> str:
    if title:
        return f"Name: {title}. {name.title()}"
    else:
        return f"Name: {name.title()}"

format_name("john doe", "Mr")

Pytest

The pytest framework makes it easy to write small, readable tests, and can scale to support complex functional testing for applications and libraries. Pytest is equiped with metabuild ability so that it can compile the test files as desired.

What is fixtures? We can tell pytest that a particular function is a fixture by decorating it with @pytest.fixture. Fixtures are acquired by test funcitons by declaring them as arguments. Pytest has several useful built-in fixtures.

How to parametrize fixtures and test functions

# run all the tests in a repo
pytest

# run a specific test
cd /path/to/code/
pytest -v test_file.py::test_case
# content of test_expectation.py
import pytest

# pytest mark a function
@pytest.mark.parametrize("test_input,expected", [("3+5", 8), ("2+4", 6), ("6*9", 42)])
def test_eval(test_input, expected):
    assert eval(test_input) == expected

# pytest mark a class
@pytest.mark.parametrize("n,expected", [(1, 2), (3, 4)])
class TestClass:
    def test_simple_case(self, n, expected):
        assert n + 1 == expected

    def test_weird_simple_case(self, n, expected):
        assert (n * 1) + 1 == expected
import pytest
# global pytest mark which parametrize all tests in a module
pytestmark = pytest.mark.parametrize("n,expected", [(1, 2), (3, 4)])

class TestClass:
    def test_simple_case(self, n, expected):
        assert n + 1 == expected

    def test_weird_simple_case(self, n, expected):
        assert (n * 1) + 1 == expected
# mark individual test instances
import pytest

# use built-in xfail to set an expected failure test.
@pytest.mark.parametrize(
    "test_input,expected",
    [("3+5", 8), ("2+4", 6), pytest.param("6*9", 42, marks=pytest.mark.xfail)],
)
def test_eval(test_input, expected):
    assert eval(test_input) == expected

Module import

Module import using importlib and pkgutil.extend_path to handle separate source file folders under a same package/module name:

# Examine the current imported modules and paths
import sys
import importlib
sys.modules
sys.path_importer_cache

1) Manual import workaround:

import sys
import importlib

pkgname = 'clio'

for key,finder in sys.path_importer_cache.items():
  if isinstance(finder, importlib.machinery.FileFinder):
    spec = finder.find_spec(pkgname)

    if spec is not None:
      print(f'Adding spec {spec} and exec module')
      modl = importlib.util.module_from_spec(spec)

      loader = importlib.util.LazyLoader(spec.loader)
      spec.loader = loader
      spec.loader.exec_module(modl)

      if not pkgname in sys.modules:
        sys.modules[pkgname] = modl
      else:
        sys.modules[pkgname].__path__ += spec.submodule_search_locations


print(f'sys.modules[{pkgname}] = {sys.modules[pkgname]}')
print(f'sys.modules[{pkgname}].__path__ = {sys.modules[pkgname].__path__}')

subpkgname = 'core_models'
subpkgfullname = pkgname + "." + subpkgname

print(f'Pre-Import is key {subpkgfullname} in sys.modules: {subpkgfullname in sys.modules}')

for pth in sys.modules[pkgname].__path__:
  finder = importlib.machinery.FileFinder(pth)
  spec = finder.find_spec(subpkgfullname)

  if spec is not None:
      print(f'Adding spec {spec} and exec module')
      modl = importlib.util.module_from_spec(spec)

      assert spec.loader is not None, "loader can't be None"
      loader = importlib.util.LazyLoader(spec.loader)
      spec.loader = loader
      spec.loader.exec_module(modl)

      if not subpkgfullname in sys.modules:
        sys.modules[subpkgfullname] = modl
      else:
        sys.modules[subpkgfullname].__path__ += spec.submodule_search_locations

# importlib.import_module doesn't deal with multiple locations
# submodl = importlib.import_module(subpkgfullname)

# print(submodl)
# print(f"dir(submodl of {submodl.__name__})={dir(submodl)}")

print(f'PostImport sys.modules[{subpkgfullname}].__path__ = {sys.modules[subpkgfullname].__path__}')

# now import what we need
# import clio.ait_models.unets
# import clio.core_models.unets
import clio.export.aitemplate_export

2) Recommended solution: In order to let python be aware of the situation (identical package name with different file paths), use pkguitl.extend_path in the first imported module init.py to configure the python import behavior.

Put the following code in the init.py of the package folder that will be the first to load under the same parent package name:

from pkgutil import extend_path
# Extend path to include all module paths with an identical parent package name
__path__ = extend_path(__path__, __name__)

For example, we put the above code in the following files:

  1. cliocore/ait_models/unets/src/clio/init.py
  2. cliocore/core_models/autoencoders/src/clio/core_models/init.py

Then the path for the loaded module will be the list of all the modules under the same name.

AsyncRunner

import asyncio
import threading
from collections.abc import Coroutine
from typing import Any

import gevent

class AsyncRunner:
    loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
    lock: threading.Lock = threading.Lock()
    # Dedicated thread to handle even loop.
    thread: threading.Thread | None = None

    @classmethod
    def start_event_loop(cls) -> None:
        with cls.lock:
            if cls.thread is None:
                cls.thread = threading.Thread(target=cls.loop.run_forever, daemon=True)
                cls.thread.start()

    @classmethod
    def stop_event_loop(cls) -> None:
        with cls.lock:
            if cls.thread is not None:
                cls.loop.call_soon_threadsafe(cls.loop.stop)
                cls.thread.join()
                cls.thread = None

    @classmethod
    def run(cls, coro: Coroutine[Any, Any, Any]) -> Any:
        assert cls.thread is not None
        
        fu = asyncio.run_coroutine_threadsafe(coro, cls.loop)

        ev = gevent.event.Event()
        fu.add_done_callback(lambda _: ev.set())
        ev.wait()

        return fu.result()

MISC

Ruff extension for VSCode as a quick linter:

ruff check --exclude clio-docker --ignore ANN,D . 
ruff check --exclude clio-docker --select UP . [--fix]

ruff rule S101

mypy for annotation check. (Check rules installed in pyproject.toml)

mypy /path/to/dir

Print Env var LD_LIBRARY_PATH in python:

import os, sys
print(os.environ['LD_LIBRARY_PATH'])

References