Hello,
I would like to ask for your kind help and expertise.
I bumped into a strange behavior of the below program.
from __future__ import annotations
from datetime import datetime
from typing import Final, List, NamedTuple
import json
import pathlib
import sys
import uuid
import trio
import trio_mysql
TIMEOUT: Final = 5 # seconds
SQL_HOST: Final = "0.0.0.0"
SQL_USER: Final = "xxx"
SQL_PASSWORD: Final = "0000"
SQL_DB: Final = "xxx"
SQL: Final = """INSERT INTO `table_scan`
(`line_no`,
`station_no`,
`code`,
`created_at`) VALUES (%s, %s, %s, %s)"""
class Reader(NamedTuple):
line_no: int
station_no: int
ip: str
port: int
class Result(NamedTuple):
line_no: int
station_no: int
code: str
at: str
# fmt: off
async def parent(readers: List[Reader]) -> None:
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(10)
async with send_channel, receive_channel:
for reader in readers:
nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
nursery.start_soon(code_consumer_loop, nursery, receive_channel.clone())
async def code_producer_loop(nursery: trio.Nursery, send_channel: trio.MemorySendChannel, reader: Reader) -> None:
async with send_channel:
# It tries to connect to reader.
# If it fails or reaches the timeout,
# It returns and starts a new coroutine.
with trio.move_on_after(TIMEOUT) as cancel_scope:
try:
stream = await trio.open_tcp_stream(reader.ip, reader.port)
except OSError as error:
print(f"Connection failed {error}")
await trio.sleep_forever()
# If it successfully connects to a reader,
# It waits until it receives some value.
# If there is value then consumes it.
# '#565803881D###290618*071RL3I0027836*=\r\n'
# It does some formatting, like removing the
# leading and trailing whitespace, and sends
# the values in form of Result (namedtuple).
if not cancel_scope.cancelled_caught:
while True:
try:
if code := bytes.decode(await stream.receive_some(), encoding="utf-8").strip():
await send_channel.send(
Result(
reader.line_no,
reader.station_no,
code,
str(datetime.now()),
)
)
except trio.BrokenResourceError as error:
print(f"Connection lost {error}")
await stream.aclose()
nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
return
else:
print(f"Could not connect to {reader=} in {TIMEOUT} seconds.")
nursery.start_soon(code_producer_loop, nursery, send_channel.clone(), reader)
return
async def code_consumer_loop(nursery: trio.Nursery, receive_channel: trio.MemoryReceiveChannel) -> None:
async with receive_channel:
async for result in receive_channel:
print(f"{result=}")
nursery.start_soon(mysql_upload, nursery, result)
# noinspection PyArgumentList
async def mysql_upload(nursery: trio.Nursery, result: Result) -> None:
connection = trio_mysql.connect(
host=SQL_HOST,
user=SQL_USER,
password=SQL_PASSWORD,
db=SQL_DB,
charset='utf8mb4',
cursorclass=trio_mysql.cursors.DictCursor
)
# It tries to save the result into the database.
try:
async with connection as conn:
async with conn.cursor() as cursor:
await cursor.execute(SQL, (result.line_no, result.station_no, result.code, result.at))
await conn.commit()
# If it fails, It serializes the current result.
except (trio.TooSlowError, trio_mysql.MySQLError) as err:
print(f"MySQL server related exception: {err}")
# If exception happens, it serializes the result into a json file.
await trio.Path(f"DataCache\\record_{uuid.uuid4()}.json").write_text(json.dumps(result._asdict(), indent=4))
# It there isn't problem connection problem,
# then it tries to upload results serialized previously.
# Note: no awaits between when
# we start listing the directory
# and when we unlink the files.
else:
for file in pathlib.Path("DataCache").glob("record_*-*-*-*-*.json"):
result_from_json = Result(**json.loads(pathlib.Path(file).read_text()))
nursery.start_soon(mysql_upload, nursery, result_from_json)
pathlib.Path(file).unlink(missing_ok=True)
def main():
pathlib.Path(r"DataCache").mkdir(exist_ok=True)
# It tries to load the config file and creates List[Reader]
try:
readers = [Reader(**reader) for reader in json.loads(pathlib.Path("config.json").read_text())]
except TypeError as err:
print(f"{pathlib.Path('config.json').resolve()} content is wrong.")
sys.exit()
except FileNotFoundError:
print(f"Missing {pathlib.Path('config.json').resolve()} file.")
sys.exit()
else:
trio.run(parent, readers)
if __name__ == "__main__":
main()
What I experience that I found odd records in the table_scan.
As you can see the is a huge difference (in created_at) between 93 551 and 93 552; 93 558 and 93 559.
2 and 4 hours earlier data.
I don’t know why and how it can happen.
Did I make some mistake? What should I alter?