Code logger app

Hello,

I would like to ask for your kind help and expertise.

I bumped into a strange behavior of the below program.

from __future__ import annotations
from datetime import datetime
from typing import Final, List, NamedTuple
import json
import pathlib
import sys
import uuid

import trio
import trio_mysql

TIMEOUT: Final = 5  # seconds

SQL_HOST: Final = "0.0.0.0"
SQL_USER: Final = "xxx"
SQL_PASSWORD: Final = "0000"
SQL_DB: Final = "xxx"

SQL: Final = """INSERT INTO `table_scan`
    (`line_no`,
     `station_no`,
     `code`,
     `created_at`) VALUES (%s, %s, %s, %s)"""


class Reader(NamedTuple):
    line_no: int
    station_no: int
    ip: str
    port: int


class Result(NamedTuple):
    line_no: int
    station_no: int
    code: str
    at: str


# fmt: off
async def parent(readers: List[Reader]) -> None:
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(10)
        async with send_channel, receive_channel:
            for reader in readers:
                nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
            nursery.start_soon(code_consumer_loop, nursery, receive_channel.clone())


async def code_producer_loop(nursery: trio.Nursery, send_channel: trio.MemorySendChannel, reader: Reader) -> None:
    async with send_channel:
        # It tries to connect to reader.
        # If it fails or reaches the timeout,
        # It returns and starts a new coroutine.
        with trio.move_on_after(TIMEOUT) as cancel_scope:
            try:
                stream = await trio.open_tcp_stream(reader.ip, reader.port)
            except OSError as error:
                print(f"Connection failed {error}")
                await trio.sleep_forever()

        # If it successfully connects to a reader,
        # It waits until it receives some value.
        # If there is value then consumes it.
        # '#565803881D###290618*071RL3I0027836*=\r\n'
        # It does some formatting, like removing the
        # leading and trailing whitespace, and sends
        # the values in form of Result (namedtuple).
        if not cancel_scope.cancelled_caught:
            while True:
                try:
                    if code := bytes.decode(await stream.receive_some(), encoding="utf-8").strip():
                        await send_channel.send(
                            Result(
                                reader.line_no,
                                reader.station_no,
                                code,
                                str(datetime.now()),
                            )
                        )
                except trio.BrokenResourceError as error:
                    print(f"Connection lost {error}")
                    await stream.aclose()
                    nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
                    return

        else:
            print(f"Could not connect to {reader=} in {TIMEOUT} seconds.")
            nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
            return


async def code_consumer_loop(nursery: trio.Nursery, receive_channel: trio.MemoryReceiveChannel) -> None:
    async with receive_channel:
        async for result in receive_channel:
            print(f"{result=}")
            nursery.start_soon(mysql_upload, nursery, result)


# noinspection PyArgumentList
async def mysql_upload(nursery: trio.Nursery, result: Result) -> None:
    connection = trio_mysql.connect(
        host=SQL_HOST,
        user=SQL_USER,
        password=SQL_PASSWORD,
        db=SQL_DB,
        charset='utf8mb4',
        cursorclass=trio_mysql.cursors.DictCursor
    )
    # It tries to save the result into the database.
    try:
        async with connection as conn:
            async with conn.cursor() as cursor:
                await cursor.execute(SQL, (result.line_no, result.station_no, result.code, result.at))
            await conn.commit()

    # If it fails, It serializes the current result.
    except (trio.TooSlowError, trio_mysql.MySQLError) as err:
        print(f"MySQL server related exception: {err}")
        # If exception happens, it serializes the result into a json file.
        await trio.Path(f"DataCache\\record_{uuid.uuid4()}.json").write_text(json.dumps(result._asdict(), indent=4))

    # It there isn't problem connection problem,
    # then it tries to upload results serialized previously.
    # Note: no awaits between when
    # we start listing the directory
    # and when we unlink the files.
    else:
        for file in pathlib.Path("DataCache").glob("record_*-*-*-*-*.json"):
            result_from_json = Result(**json.loads(pathlib.Path(file).read_text()))
            nursery.start_soon(mysql_upload, nursery, result_from_json)
            pathlib.Path(file).unlink(missing_ok=True)


def main():
    pathlib.Path(r"DataCache").mkdir(exist_ok=True)

    # It tries to load the config file and creates List[Reader]
    try:
        readers = [Reader(**reader) for reader in json.loads(pathlib.Path("config.json").read_text())]

    except TypeError as err:
        print(f"{pathlib.Path('config.json').resolve()} content is wrong.")
        sys.exit()

    except FileNotFoundError:
        print(f"Missing {pathlib.Path('config.json').resolve()} file.")
        sys.exit()

    else:
        trio.run(parent, readers)


if __name__ == "__main__":
    main()

What I experience that I found odd records in the table_scan.

As you can see the is a huge difference (in created_at) between 93 551 and 93 552; 93 558 and 93 559.

2 and 4 hours earlier data.

I don’t know why and how it can happen.

Did I make some mistake? What should I alter?

this thread should be in the “Help and advice” category?