Develope Assembly Line

Hello,

First of all I would like to express my thankfulness because of Trio.
I cannot express how amazed I am. :slight_smile:

I am a Process Engineer at an automotive company. I can describe myself as a mechanical engineer who codes in Python.

My relationship with Python relatively new, I have code for almost 1,5 years.

Currently I am working on a project which (demands) involves Trio (concurrency) at high level.

The task briefly: develop a program that runs a three-stationed assembly line with a bunch of sensors, barcode readers, zebra printers, scale. The work happens in a parallel way involving 2, 3 operators.

I have a small snippets which I would like to show. I am at the very beginning but I would like to avoid mind-slips so I would really appreciate any thoughts on it.

import trio
import trio_util


async def jobs_pool():
    async with trio.open_nursery() as nursery_jobs: # type: trio.Nursery
        async for _ in trio_util.periodic(1):
            if bool(int(await trio.Path("probe.txt").read_text(encoding="utf-8"))): # sensor_1
                print(f"{jobs_pool=} is about to instantiate a new job.")
                await nursery_jobs.start(job)


async def job(task_status: trio._core._run._TaskStatus):
    print(f"{job=} started.")
    async with trio.open_nursery() as nursery_job: # type: trio.Nursery
        # Why it doesn't work when trio.CancelScope
        # wraps the trio.open_nursery()
        with trio.CancelScope() as cancel_scope:
            nursery_job.start_soon(sensor_1, task_status, nursery_job)
            await read_dmc()
            await tightening()
            await tightening()
            cancel_scope.shield = True
            await print_label()
            ...
            print(f"{job=} pending.")
            await trio.sleep_forever()


async def sensor_1(task_status: trio._core._run._TaskStatus, nursery_job: trio.Nursery):
    async for _ in trio_util.periodic(1):
        if not bool(int(await trio.Path("probe.txt").read_text(encoding="utf-8"))):
        # What should I start with?
        # task_status.started() or 
        nursery_job.cancel_scope.cancel()
        print(f"{nursery_job.parent_task} aborted.")
        task_status.started()
        nursery_job.cancel_scope.cancel()


async def read_dmc():
    await trio.sleep(1)
    print("DMC code read.")


async def tightening():
    await trio.sleep(5)
    print("Tightening end.")


async def print_label():
    await trio.sleep(1)
    print("Label printed.")


if __name__ == "__main__":
    trio.run(jobs_pool)

What it does is actually to check the state of a sensor (currently probe.txt) in every 1 seconds.
Afterwards it tries to initiate a new job task. The jobs_pool cannot continue its work until the new job task instantizaton don’t reach a certain level (it has DMC code, and 2 tightenings, etc.).
Then the jobs_pool continues the monitoring of the sensor (probe.txt) and if there is signal, create another job.

Those jobs which successfully reach at a certain level, now sit in the jobs_pool and continue their work independently (from any other job).

This is the skeleton of the first station, and the code is actually the proof of concept. I have a lot of work with it.

Actually I would like to read your opinion about this code, what should I change, what do you do differently.

Any idea are welcomed.

I am open minded for any idea, better naming, conventions.

Thanks in advance.

Did I make any mistake by posting here?
Wrong topic? (Should I post it elsewhere?)

I would appreciate some thoughts.
Maybe some good projects (involved lots of Trio) to see.

Thanks in advance?

You did not make any mistake, and you’re in the right place! This is very interesting, and we’ve been meaning to answer, but did not do it yet. Sorry about that!

So my main concern that I don’t know what structure, approach to use.
I mean I have three stations.

1st station:

  1. 3 sensors
  2. a code reader
  3. a screwing device
  4. a printer device

2nd station:

  1. a code reader
  2. 3 sensors
  3. a press machine

3rd station:

  1. a code reader
  2. a weight scale
  3. a printer device

With these three stations and all the steps we can create a final part.

I imagine something ie. an instance of FinalPart that describe the final products: part number, results of screwing, timestamps, operators ID, pressing results, weight measure results, etc.

I think that I have to push this instance through all the steps.

It is very crucial to find a good pattern because it can happen that in the future I will have to implement for instance another sensor. And I want to avoid to recode the previous part whenever I modifie something.

What do you think where I should start at?

I really appreciate your help.

So let me see if I get this right. Your code periodically reads probe.txt and if it contains a nonzero number, it spawns a new task. The new task then has to call read_dmc() and then tightening() twice before the next task can be spawned. But It is unclear to me why you are reading probe.txt continuously even in the subtask. What is the intended logic here?

I use the probe.txt in order to substitute a part presence sensor.

First it is necessary to monitor it because of spawning a new subtask. On the other hand it is necessary to monitor it even in the subtask because I have to know when the operator remove the part from its place. If it happens then I have to abort the current subtask.
Afterwards I continues the reading of sensor (probe.txt) whether a new part placed in or not.

As I have mentioned it is a three-stationed assembly line. Everything happens in parallel. But I have to ensure that for instance only one part can occupy the first station at a time. But if this certain part get all the necessary operations, it then can wait for indefinite time. So there is no FIFO or LIFO or anything else.

I think I will modify the first part of code in order to use it the trio.CapacityLimiter.

I don’t know where to store those jobs which are semi finished. My thought was a memory channel but it doesn’t work because I don’t know that which part is that reach the 2nd station.
I mean there are 5 jobs in the memory channel and I have to read out all the jobs and pick out the right one (based on a barcode) but then I have to do something with the remaining 4 jobs which are then no longer being in the memory channel but the 2nd’s station functions.
It would be convenient if I could read the objects being in the memory channel without I pick out them.

I really appreciate any thought.

I did an illustration.

I think I will have three coroutines each describes the behaviour of a station.
Currently I don’t know what structure I should use to store the results between the stations.
One obvious solution would be to use some kind of list (inside which some container data type (dataclass)) and pass the reference of it to each coroutines.

trio.open_memory_channel doesn’t work for me in this certain case because I have iterate over the whole list until I find the right instance. I have to leave untouched the other instances.
Each station has an entry point at which I can identify the necessary instance (actually based on a barcode). So I have to look for this barcode in the List[Parts].

I don’t know whether there is a better solution?

Another thoughts that came into my mind that should I use a class to describe the behaviour of stations? Is it possible to use class which has coroutines as methods?

Thanks in advance.

Hmm. Your “job” procedure doesn’t call task_status.started().

A Dict[barcode, Part] would make more sense than a List[Part].

Of course you can use coroutines as methods!