🐍 Python Once a Week
Everyday Context Manager Recipes2019-12-03

This post is designed to encourage you to use context managers in your workflow to show you the utility of writing your own context managers.

First and foremost context managers are used to manage resources. Let's take the most prolific and canonical of why we use context managers in python:

fd = open('/path/to/file.txt')
raw = fd.read()
# operate on fd ...
print(raw)
fd.close()

# keep running our program

If something goes wrong when we operate on fd then an Exception will bubble up and leave us with a unclosed and dangling file. This is problematic. In windows, only one process can (typically) open a file at a time. In all operating systems opening a file also consumes memory and your limit on file descriptors of which most processes only have a finite amount of it. It's generally great practice to close resources when you're no longer using them.

We could cleverly use a try/except/finally block to mitigate this problem (and that's a great way to go about this), or we can use context managers.

with open('/path/to/file.txt') as fd:
    raw = fd.read()
    # operate on fd
    print(raw)
    
# keep running our program

Once we're out of the the indented section, below the with clause and above # keep running our program, the open(...) context manager will close fd on our behalf. It does this whenever we go out of the indented section, by any means, even if an exception occurred. And keep in mind exiting a program by using Ctrl-C also bubbles an exception up, guaranteeing the file descriptor to be closed.

In this way context managers help us manage file resources, but in your day to day work you'll find a bunch of other kinds of resources: databases, process pools, servos, motors. Having a clear scope in which the resource is active is an incredibly useful concept, and the syntactic sugar python gives is super handy.

I won't deep dive into the intricacies on context managers; For that, I would go spelunking through python's docs in roughly this order:

Instead, I want to give some custom uses of context managers I've found helpful through the years to get you more comfortable with context managers after you've already been exposed to them.

Simple Timing Utility Function

from contextlib import contextmanager
import time

@contextmanager
def timeit(title):
    start = time.time()
    yield
    delta = time.time() - start
    print(f'{title}: {delta:.3f}s')
    
    
"""
Example Code Below
"""
    
with timeit('tester'):
    list(range(10000))

@timeit('foo')
def foo():
    list(range(100000))
    print('function foo')

foo()
tester: 0.001s
function foo
foo: 0.008s

If you want to turn this into a more serious function, I'd certainly replace print with python's logging module. If you're doing multiprocess/mulithreading code, I would also add the current process/thread id in the logging statement. However, for simple timing debug statements, this is a nice short hand.

SQLAlchemy

"""
Manage sessions with sqlalchemy.

https://docs.sqlalchemy.org/en/13/orm/session_basics.html#when-do-i-construct-a-session-when-do-i-commit-it-and-when-do-i-close-it
"""
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

# configure Session class with desired options
Session = sessionmaker()

# Elsewhere ...
engine = create_engine('...')
# associate it with our custom Session class
Session.configure(bind=engine)

# work with the session
@contextmanager
def session_scope():
    """Provide a transactional scope around a series of operations."""
    session = Session()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()

If you follow my databasing in python primer, you'll see how this can come in handy. Essentially by doing something like:

with session_scope() as session:
    session.add(...)
    session.query(...)
    # etc ...

we can ensure that when session goes out of scope our connection with the database is in a clean-state. Either because our operations were succesful and we comitted and closed the session or because we rollbacked our session state in the case of an Exception. This is an awesome one that more folks should know about.

Output Redirection

from contextlib import contextmanager
import io
import sys


@contextmanager
def capture_stdout(redirect=None):
    """
    Similar to https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout
    """
    previous_stdout = sys.stdout
    buffer = redirect or io.StringIO()
    sys.stdout = buffer
    yield sys.stdout
    sys.stdout = previous_stdout
    
"""
Example Code Below
"""

with capture_stdout() as buffer:
    print('foo bar zap')
    print('stroopenwafle')

print(f'buffer was {buffer.getvalue()}')
buffer was foo bar zap
stroopenwafle

This particular one has been helpful in a number of weird cases. One other cool thing you can do with capturing stdout is adding color to your print statements. Check this link out https://misc.flogisoft.com/bash/tip_colors_and_formatting.

Mutual Exclusion Locks when Threading

import threading
import functools
from contextlib import contextmanager
from collections import defaultdict

@contextmanager
def synchronized(obj, __lock__=threading.RLock(), __cache__=defaultdict(threading.RLock)):
    # NOTE: This will not work for multiprocessing, only threading
    # Protect from multiple threads creating differen locks for the same object
    with __lock__:
        # grab this object's unique lock
        # id(obj) returns a unique identifier for the object instance
        lock = __cache__[id(obj)]
    
    # Use the lock, return the object
    with lock:
        yield obj
        
"""
Example Code Below
"""
        
data = dict(foo='bar')


def tester(data):
    print(f'thread id {threading.current_thread().ident} | id {id(data)} | data {data}')
    with synchronized(data) as data:
        data['foo'] = threading.current_thread().ident
        data[threading.current_thread().ident] = id(data)


with synchronized(data) as data:
    print(data)
    
thread_a = threading.Thread(target=tester, args=(data,))
thread_b = threading.Thread(target=tester, args=(data,))
thread_a.start()
thread_b.start()

thread_a.join()
thread_b.join()
tester(data)
{'foo': 'bar'}
thread id 139956114671360 | id 139956375107192 | data {'foo': 'bar'}
thread id 139956114671360 | id 139956375107192 | data {'foo': 139956114671360, 139956114671360: 139956375107192}
thread id 139956717655872 | id 139956375107192 | data {'foo': 139956114671360, 139956114671360: 139956375107192}

Here we create a unique Lock for each object passed into synchronized. Note that we use the built-in id function to get a unique identifier for the given object. Python docs tell us that this is the object's address in memory, so this will work across threads (and for the same reason won't work across processes, since processes in python do not share a memory address). This is definitely short-hand; but if you have this in your project's utility module, it can be a handy one!

Multiprocessing Pool Manager

"""
Lifetime manager for multiprocessing Pool objects.

For Python < 3.3 only.

In Python >= 3.3 Pools already this capability
https://docs.python.org/3.8/library/multiprocessing.html
"""

import functools
import multiprocessing
from contextlib import contextmanager

@contextmanager
def lifeguard(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    try:
        yield pool
        # Ensure no more jobs are committed, we need to call this to call `join`
        pool.close()
        # Wait until all workers are done by the time we exit 
        # This will block the pool until all workers are done
        # So by the time we exit the pool scope, we know there are no dangling workers.
        pool.join()
    except Exception:
        # Should be called when pool goes out of scope
        # But let's do it here again to be explicit
        pool.terminate()
        raise

This one isn't useful for modern versions of python (since modern versions have this functionality built into Pool!), but much like the SQLAlchemy can be useful if you're in the situation where you can't immediately jump to python > 3.2 but still want to improve you code.

Conclusion

I hope that was helpful to you. If you have your own helpful context managers, let me know! Have a wonderful day 😁.