Python etc
6.12K subscribers
18 photos
194 links
Regular tips about Python and programming in general

Owner — @pushtaev

© CC BY-SA 4.0 — mention if repost
Download Telegram
I often find myself writing a context manager to temporarily change the current working directory:

import os
from contexlib import contextmanager

@contextmanager
def enter_dir(path):
old_path = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_path)


Since Python 3.11, a context manager with the same behavior is available as contextlib.chdir:

import os
from contextlib import chdir

print('before:', os.getcwd())
# before: /home/gram
with chdir('/'):
print('inside:', os.getcwd())
# inside: /
print('after:', os.getcwd())
# after: /home/gram
The typing.assert_type function (added in Python 3.11) does nothing in runtime as most of the stuff from the typing module. However, if the type of the first argument doesn't match the type provided as the second argument, the type checker will return an error. It can be useful to write simple "tests" for your library to ensure it is well annotated.

For example, you have a library that defines a lot of decorators, like this:

from typing import Callable, TypeVar

C = TypeVar('C', bound=Callable)

def good_dec(f: C) -> C:
return f

def bad_dec(f) -> Callable:
return f


We want to be 100% sure that all decorators preserve the original type of decorated function. So, let's write a test for it:

from typing import Callable, assert_type

@good_dec
def f1(a: int) -> str: ...

@bad_dec
def f2(a: int) -> str: ...

assert_type(f1, Callable[[int], str]) # ok
assert_type(f2, Callable[[int], str]) # not ok
PEP 681 (landed in Python 3.11) introduced typing.dataclass_transform decorator. It can be used to mark a class that behaves like a dataclass. The type checker will assume that it has init that accepts annotated attributes as arguments, eq, ne, and str. For example, it can be used to annotate SQLAlchemy or Django models, attrs classes, pydantic validators, and so on. It's useful not only for libraries that don't provide a mypy plugin but also if you use a non-mypy type checker. For instance, pyright, which is used by vscode Python plugin to show types, highlight syntax, provide autocomplete, and so on.
As we covered a 3 years back (gosh, the channel is old), if the result of a base class is the current class, a TypeVar should be used as the annotation:

from typing import TypeVar

U = TypeVar('U', bound='BaseUser')

class BaseUser:
@classmethod
def new(cls: type[U]) -> U:
...

def copy(self: U) -> U:
...

That's quite verbose, but it's how it should be done for the return type for inherited classes to be correct.

PEP 673 (landed in Python 3.11) introduced a new type Self that can be used as a shortcut for exactly such cases:

from typing import Self

class BaseUser:
@classmethod
def new(cls) -> Self:
...

def copy(self) -> Self:
...
The reveal_type function doesn't exist. However, if you call it and then run a type-checker (like mypy or pyright) on the file, it will show the type of the passed object:

a = 1
reveal_type(a)
reveal_type(len)

Now, let's run mypy:

$ mypy tmp.py
tmp.py:2: note: Revealed type is "builtins.int"
tmp.py:3: note: Revealed type is "def (typing.Sized) -> builtins.int"

It's quite helpful to see what type mypy inferred for the variable in some tricky cases.

For convenience, the reveal_type function was also added in typing module in Python 3.11:

from typing import reveal_type
a = 1
reveal_type(a)
# prints: Runtime type is 'int'
reveal_type(len)
# prints: Runtime type is 'builtin_function_or_method'

And for curious, here is the definition:

def reveal_type(__obj: T) -> T:
print(
f"Runtime type is {type(__obj).__name__!r}",
file=sys.stderr,
)
return __obj
PEP 675 (landed in Python 3.11) introduced a new type typing.LiteralString. It matches any Literal type, which is the type for explicit literals and constants in the code. The PEP shows a very good example of how it can be used to implement a SQL driver with protection on the type-checker level against SQL injections:

from typing import LiteralString, Final

def run_query(sql: LiteralString): ...

run_query('SELECT * FROM students') # ok

ALL_STUDENTS: Final = 'SELECT * FROM students'
run_query(ALL_STUDENTS) # ok

arbitrary_query = input()
run_query(arbitrary_query) # type error, don't do that
The isinstance function checks whether an object is an instance of a class or of a subclass thereof:

class A: pass
class B(A): pass
b = B()
isinstance(b, B) # True
isinstance(b, A) # True
isinstance(b, object) # True
isinstance(b, str) # False
isinstance(str, type) # True


Type-checkers understand isinstance checks and use them to refine the type:

a: object
reveal_type(a)
# ^ Revealed type is "builtins.object"
if isinstance(a, str):
reveal_type(a)
# ^ Revealed type is "builtins.str"


One more cool thing about isinstance is that you can pass in it a tuple of types to check if the object is an instance of any of them:

isinstance(1, (str, int)) # True
PEP 427 introduced (and PEP 491 improved) a new format for Python distributions called "wheel".

Before the PEP, Python distributions were just tar.gz archives containing the source code of the library distributed, some additional files (README.rst, LICENSE, sometimes tests), and setup.py file. To install the library from the distribution, pip had to download the archive, extract it into a temporary directory, and execute python setup.py install to install the package.

Did it work? Well, kind of. It works well enough for pure Python packages, but if the package has C code, it had to be built on the target machine each time the package needs to be installed, because the built binary highly depends on the target OS, architecture, and Python version.

The new wheel format allows to significantly speed up the process. It changed 2 significant things:

1. The file name for wheel packages is standardized. It contains the name and version of the package, the required minimal version (2.7, 3.8), the type (CPython, PyPy) of the Python interpreter, OS name, architecture, and ABI version. For example, flask-1.0.2-py2.py3-none-any.whl says "it is flask package version 1.0.2 for both Python 2 and 3, any ABI, and any OS". That means, Flask is a pure Python package, so can be installed anywhere. Or psycopg2-2.8.6-cp310-cp310-linux_x86_64.whl says "it is psycopg2 version 2.8.6 for CPython 3.10 Linux 64bit". That means psycopg2 has some prebuild C libraries for a very specific environment. The package can have multiple wheel distributions per version, and pip will pick and download the one that is made for you.

2. Instead of setup.py, the archive (which is now zip instead of tar.gz) contains already parsed metadata. So, to install the package, it's enough to just extract it into site-packages directory, no need to execute anything.

Currently, the wheel distribution format is well-adopted and available for almost all modern packages.

When you create a new virtual environment, make sure you have the latest version of setuptools for tarballs, and the latest version of the wheel package for wheels. No, really, do it. The wheel package is not installed by default in the new venvs, and without it, installation of some packages will be slow and painful.

python3 -m venv .venv
.venv/bin/pip install -U pip setuptools wheel
PEP-518 introduced changes not in Python itself but rather in its ecosystem. The idea is pretty simple: let's store configs for all tools in pyproject.toml file, in tool.TOOL_NAME section. For example, for mypy:

[tool.mypy]
files = ["my_project"]
python_version = 3.8

At this moment, almost all popular tools support pyproject.toml as the configuration file, in one way or another: mypy, pytest, coverage, isort, bandit, tox, etc. The only exception from the tooling I know is flake8.

Before pyproject.toml, many tools used to use setup.cfg for the same purpose, but this format (INI) has a few disadvantages compared to TOML: it's not well-standardized, and the only supported type of values is string.
PEP-517 and PEP-518 introduced the build-system section in pyproject.toml that tells package management tools (like pip) how to build wheel distributions for the project. For example, this is the section if you use flit:

[build-system]
requires = ["flit_core >=3.2,<4"]
build-backend = "flit_core.buildapi"


It tells pip to install flit_core of the given version and then call callbacks inside flit_core.buildapi, which should build the distribution for the project.

Having this section allows pip to build and install any Python project from the source, doesn't matter what build system it uses. Before the PEP, tools like poetry and flit had to generate a special setup.py file for pip to be able to install the project from the source (or a non-wheel tarball distribution).
To recap: PEP-518 introduced pyproject.toml, and many Python tools started to use it to store their configs. The issue, however, is that there is no module in stdlib to parse TOML. So, different tools started to use different third-party packages for the task:

+ tomli (used by mypy) is a pure Python library that can only read TOML.
+ toml (used by most of the tools) can both read and write TOML.
+ tomlkit (used by poetry) can read, write, and modify TOML (preserving the original formatting and comments).

PEP 680 (landed in Python 3.11) introduced tomli into stdlib. But why tomli and not another library? It's pure Python and minimalistic. It cannot write TOML files, but reading is enough for most of the tools to work with pyproject.toml. And to avoid unpleasant conflicts when tomli is installed in the same environment, the name of the module was changed to tomllib.
The float type is infamous for being not as precise as you might expect. When you add 2 numbers, the result might contain a small error in precision. And the more numbers you add together, the higher the error:

sum([.9] * 1_000)
# 899.9999999999849

sum([.9] * 1_000_000)
# 900000.0000153045


If you want to minimize the error when summing together a list of floats, use math.fsum:

import math

math.fsum([.9] * 1_000_000)
# 900000.0
It's time for us to talk about async/await in Python. That's a big and difficult topic but a very important one if you're working with the network.

Everything your program does belongs to one of the two classes:

+ CPU-bound tasks. This is when you do a lot of computations, and the fan of your PC makes helicopter noises. You can speed up computations with multiprocessing, which is a pain in the ass to do correctly.

+ IO-bound tasks. This is when your code does nothing except wait for a response from the outside world. It includes making all kinds of network requests (sending logs, querying a database, crawling a website), network responses (like when you have a web app), and working with files. You can speed up it using async/await syntax.

The basics are quite simple:

1. If you define a function using async def instead of just def, it will return a "coroutine" when is called instead of immediately running and calculating the result.

2. If you call inside an async function another async function with adding await before it, Python will request execution of this coroutine, switch to something else, and return the result when it is available.

3. The module asyncio contains some functions to work with async code and the scheduler that decides when to run which task.

This is a very basic overview. You can read the official asyncio documentation to learn more. In follow-up posts, we will cover most of asyncio functions, one by one.
Async is like mold in your fridge or GPL license in your dependencies. It propagates through your code, taking over every corner of it. You can call sync functions from async functions but async functions can be called only from other async functions, using the await keyword.

This one returns a coroutine instead of a result:

async def welcome():
return 'hello world'

def main():
return welcome()

main()
# <coroutine object welcome at 0x...>

This is how main should look instead:

async def main():
result = await welcome()
return result

Alright, but how to call the root function? It also returns a coroutine! The answer is asyncio.run, which will take a coroutine, schedule it, and return its result:

coro = main()
result = asyncio.run(coro)
print(result)

Keep in mind that asyncio.run should be called only once. You can't use it to call an async function from any sync function. Again, if you have an async function to call, all functions calling it (and all functions calling them, and so on) should also be async. Like a mold.
Your best companion in learning asyncio is asyncio.sleep. It works like time.sleep making the calling code wait the given number of seconds. This is the simplest example of an IO-bound task because while sleeping, your code literally does nothing but wait. And unlike time.sleep, asyncio.sleep is async. That means, while the calling task waits for it to finish, another task can be executed.

import asyncio
import time

async def main():
start = time.time()
await asyncio.sleep(2)
return int(time.time() - start)

asyncio.run(main())
# 2


You can't yet see how the code switches to another task while waiting because we have only one task. But bear with me, in the next posts we'll get to it.
The asyncio.gather is the function that you will use the most. You pass to it multiple coroutines, it schedules them, waits for all to finish, and returns the list of results in the same order.

import asyncio

URLS = ['google.com', 'github.com', 't.me']

async def check_alive(url):
print(f'started {url}')
i = URLS.index(url)
await asyncio.sleep(3 - i)
print(f'finished {url}')
return i

async def main():
coros = [check_alive(url) for url in URLS]
statuses = await asyncio.gather(*coros)
for url, alive in zip(URLS, statuses):
print(url, alive)

asyncio.run(main())


Output:

started google.com
started github.com
started t.me
finished t.me
finished github.com
finished google.com
google.com 0
github.com 1
t.me 2


That's what happened:

1. asyncio.gather schedules all tasks in order as they are passed.
2. We made the first task wait 3 seconds, the second wait 2 seconds, and the last one wait 1 second. And the tasks finished as soon as they could, without making everyone wait for the first task.
3. asyncio.gather waits for all tasks to finish.
4. asyncio.gather returns a list of results in the order as the coroutines were passed in it. So, it's safe to zip results with input values.
When talking about asyncio functions, sometimes I used the word "coroutine" and sometimes "task". It's time to tell you the difference:

+ coroutine is what async function returns. It can be scheduled, switched, closed, and so on. It's quite similar to generators. In fact, await keyword is nothing more than an alias for yield from, and async is a decorator turning the function from a generator into a coroutine.

+ asyncio.Future is like "promise" in JS. It is an object that eventually will hold a coroutine result when it is available. It has done method to check if the result is available, result to get the result, and so on.

+ asyncio.Task is like if coroutine and future had a baby. This is what asyncio mostly works with. It can be scheduled, switched, canceled, and holds its result when ready.

There is a cool function asyncio.create_task that can turn a coroutine into a proper task. What's cool about it is that this task immediately gets scheduled. So, if your code later encounters await, there is a chance your task will be executed at that point.

import asyncio

async def child():
print('started child')
await asyncio.sleep(1)
print('finished child')

async def main():
asyncio.create_task(child())
print('before sleep')
await asyncio.sleep(0)
print('after sleep')

asyncio.run(main())


Output:

before sleep
started child
after sleep


What happened:

1. When create_task is called, it is scheduled but not yet executed.
2. When main hits await, the scheduler switches to child.
3. When child hits await, the scheduler switches to another task, which is main
4. When main finished, asyncio.run returned without waiting for child to finish. It's dead in space now.

But what if you want to make sure a scheduled task finishes before exiting? You can pass the task into good old asyncio.gather. And later we'll see some ways to wait for it with timeouts or when you don't care about the result.

task = create_task(...)
...
await asyncio.gather(task)
In the previous post, we had the following code:

python
import asyncio

async def child():
...

async def main():
asyncio.create_task(child())
...

Can you spot a bug?

Since we don't store a reference to the background task we create, the garbage collector may destroy the task before it finishes. To avoid that, we need to store a reference to the task until it finishes. The official documentation recommends the following pattern:

python
bg_tasks = set()

async def main():
t = asyncio.create_task(child())

# hold the reference to the task
# in a global set
bg_tasks.add(t)

# automatically remove the task
# from the set when it's done
t.add_done_callback(bg_tasks.discard)

...
PEP-615 (landed in Python 3.9) introduced the module zoneinfo. The module provides access to information about time zones. It will try to use the information about time zones provided by the OS. If not available, it falls back to the official Python tzdata package which you need to install separately.

from zoneinfo import ZoneInfo
from datetime import datetime

ams = ZoneInfo('Europe/Amsterdam')
dt = datetime(2015, 10, 21, 13, 40, tzinfo=ams)
dt
# datetime(2015, 10, 21, 13, 40, tzinfo=ZoneInfo(key='Europe/Amsterdam'))

la = ZoneInfo('America/Los_Angeles')
dt.astimezone(la)
# datetime(2015, 10, 21, 4, 40, tzinfo=ZoneInfo(key='America/Los_Angeles'))

You should not use pytz anymore.
Daylight saving time (DST) is the practice of advancing clocks (typically by one hour) during warmer months so that darkness falls at a later clock time and then turning it back for colder months. That means, sometimes, once a year the clock shows the same time twice. It can also happen when the UTC shift of the current timezone is decreased.

To distinguish such situations, PEP-495 (landed in Python 3.6) introduce the fold attribute for datetime that is 0 or 1 depending if this is the first or the second pass through the given time in the given timezone.

For example, in Amsterdam the time is shifted from CEST (Central European Summer Time) to CET (Central European Time) on the last Sunday of October:

from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo

ams = ZoneInfo('Europe/Amsterdam')
d0 = datetime(2023, 10, 29, 0, 0, tzinfo=timezone.utc)
for h in range(3):
du = d0 + timedelta(hours=h)
dl = du.astimezone(ams)
m = f'{du.time()} UTC is {dl.time()} {dl.tzname()} (fold={dl.fold})'
print(m)


This code will print:

00:00:00 UTC is 02:00:00 CEST (fold=0)
01:00:00 UTC is 02:00:00 CET (fold=1)
02:00:00 UTC is 03:00:00 CET (fold=0)


However, you should keep in mind that fold is not considered in comparison operations:

d1 = datetime(2023, 10, 29, 2, 0, tzinfo=ams)
d2 = datetime(2023, 10, 29, 2, 0, fold=1, tzinfo=ams)
d1 == d2 # True


Now imagine that your system has a bug because of not handling this. That happens once a year. On Sunday. At night 🌚
Let's say, you have the following mock:

from unittest.mock import Mock
user = Mock()
user.name = 'Guido'

You fully specified all attributes and methods it should have, and you pass it into the tested code, but then that code uses an attribute that you don't expect it to use:

user.age
# <Mock name='mock.age' id='...'>

Instead of failing with an AttributeError, the mock instead will create a new mock when its unspecified attribute is accessed. To fix it, you can (and should) use the unittest.mock.seal function (introduced in Python 3.7):

from unittest.mock import seal
seal(user)

user.name
# 'Guido'

user.occupation
# AttributeError: mock.occupation