Hello,
First of all I would like to express my thankfulness because of Trio.
I cannot express how amazed I am.
I am a Process Engineer at an automotive company. I can describe myself as a mechanical engineer who codes in Python.
My relationship with Python relatively new, I have code for almost 1,5 years.
Currently I am working on a project which (demands) involves Trio (concurrency) at high level.
The task briefly: develop a program that runs a three-stationed assembly line with a bunch of sensors, barcode readers, zebra printers, scale. The work happens in a parallel way involving 2, 3 operators.
I have a small snippets which I would like to show. I am at the very beginning but I would like to avoid mind-slips so I would really appreciate any thoughts on it.
import trio
import trio_util
async def jobs_pool():
async with trio.open_nursery() as nursery_jobs: # type: trio.Nursery
async for _ in trio_util.periodic(1):
if bool(int(await trio.Path("probe.txt").read_text(encoding="utf-8"))): # sensor_1
print(f"{jobs_pool=} is about to instantiate a new job.")
await nursery_jobs.start(job)
async def job(task_status: trio._core._run._TaskStatus):
print(f"{job=} started.")
async with trio.open_nursery() as nursery_job: # type: trio.Nursery
# Why it doesn't work when trio.CancelScope
# wraps the trio.open_nursery()
with trio.CancelScope() as cancel_scope:
nursery_job.start_soon(sensor_1, task_status, nursery_job)
await read_dmc()
await tightening()
await tightening()
cancel_scope.shield = True
await print_label()
...
print(f"{job=} pending.")
await trio.sleep_forever()
async def sensor_1(task_status: trio._core._run._TaskStatus, nursery_job: trio.Nursery):
async for _ in trio_util.periodic(1):
if not bool(int(await trio.Path("probe.txt").read_text(encoding="utf-8"))):
# What should I start with?
# task_status.started() or
nursery_job.cancel_scope.cancel()
print(f"{nursery_job.parent_task} aborted.")
task_status.started()
nursery_job.cancel_scope.cancel()
async def read_dmc():
await trio.sleep(1)
print("DMC code read.")
async def tightening():
await trio.sleep(5)
print("Tightening end.")
async def print_label():
await trio.sleep(1)
print("Label printed.")
if __name__ == "__main__":
trio.run(jobs_pool)
What it does is actually to check the state of a sensor (currently probe.txt) in every 1 seconds.
Afterwards it tries to initiate a new job task. The jobs_pool cannot continue its work until the new job task instantizaton don’t reach a certain level (it has DMC code, and 2 tightenings, etc.).
Then the jobs_pool continues the monitoring of the sensor (probe.txt) and if there is signal, create another job.
Those jobs which successfully reach at a certain level, now sit in the jobs_pool and continue their work independently (from any other job).
This is the skeleton of the first station, and the code is actually the proof of concept. I have a lot of work with it.
Actually I would like to read your opinion about this code, what should I change, what do you do differently.
Any idea are welcomed.
I am open minded for any idea, better naming, conventions.
Thanks in advance.