Resolved: Check for condition during blocking call

Question:

I am writing some python code to work with the Ethereum blockchain. After I send a transaction, I run a blocking call that waits for the transaction receipt to be verified. While my program is waiting to receive the receipt I want to simultaneously check for a condition that would ultimately cancel the blocking call. I’ve been looking into using async functions or asyncio but solutions I’ve seen don’t seem to work in my case.
For more context, I have thread 1 which first builds the transaction and then makes a blocking call to wait for the receipt from said transaction:
def thread_1_func(pending_txns: dict, task_name):
  # Building transaction here and storing transaction hash in 'txn_hash'
  web3.eth.wait_for_transaction_receipt(txn_hash) # blocking call
  if pending_txns[task_name] == "Modified": # want to check this simultaneously with the blocking call
    sys.exit() # close the thread since the receipt will now never actually be received
I then have a second thread that only runs if the user modifies a certain file. The file modification indicates that we need to send a different transaction to modify that original sent transaction before it is verified and a receipt is received:
def thread_2_func(pending_txns: dict, task_name):
  # build modified version of transaction and send it, store new transaction hash in 'txn_hash'
  pending_txns[task_name] = "Modified" #This will validate the condition for the other thread to stop
  web3.eth.wait_for_transaction_receipt(txn_hash) #now this thread can block and wait for receipt
Also important to note, ‘pending_txns’ is a dictionary I currently use to share info about pending transactions amongst threads. I recognize that this certainly may not be threadsafe, but it has worked thus far for what I need.
I want to be able to continuously check the ‘pending_txns’ dict for an updated value so that I can stop thread 1. What would be the best way to achieve this?

Answer:

The web3.eth.Eth.wait_for_transaction_receipt function takes an optional timeout parameter. By default it will wait for 120 seconds, then raise a web3.exceptions.TimeExhausted. What you can do is repeatedly wait a short amount of time, catch the exception, and check if you still have to wait for the receipt. It is easy to implement if you do not care too much that the cancellation is not immediate. Here is a proof-of-concept :
import logging
import sys
import threading
import time


logger = logging.getLogger()  # for a clearer output


RECEIPT_WAITING_TIME = 5  # seconds


class FakeWeb3ExceptionTimeExhausted(Exception): pass


def fake_wait_for_transaction_receipt(timeout=120):
    logger.info("waiting ...")  # but it will never come
    lock = threading.Lock()
    acquired = lock.acquire()
    assert acquired
    re_acquired = lock.acquire(blocking=True, timeout=timeout)  # can re-acquire the lock (already acquired), so will block until timeout
    if not re_acquired:
        raise FakeWeb3ExceptionTimeExhausted()


THREAD1_SHOULD_STOP_WAITING = False


def thread_1_func():
    while True:
        try:
            result = fake_wait_for_transaction_receipt(timeout=RECEIPT_WAITING_TIME)
        except FakeWeb3ExceptionTimeExhausted:
            logger.info("pause waiting")
            global THREAD1_SHOULD_STOP_WAITING
            if THREAD1_SHOULD_STOP_WAITING:
                logger.info("stop waiting <--")
                break
            else:
                logger.info("resume waiting")
        else:
            logger.info("finish waiting")
            # do something with the result
            return  # don't sys.exit here


def thread_2_func():
    time.sleep(15)  # let some time pass
    # then change your mind
    logger.info("--> cancel waiting")
    global THREAD1_SHOULD_STOP_WAITING
    THREAD1_SHOULD_STOP_WAITING = True
    return  # explicit


def main():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    thread1 = threading.Thread(target=thread_1_func, args=())
    thread1.start()
    thread2 = threading.Thread(target=thread_2_func, args=())
    thread2.start()

    thread1.join()
    thread2.join()
    sys.exit(0)  # exit here

main()
Thread-1 2022-02-04 12:09:46,813 waiting ...
Thread-1 2022-02-04 12:09:51,814 pause waiting
Thread-1 2022-02-04 12:09:51,814 resume waiting
Thread-1 2022-02-04 12:09:51,814 waiting ...
Thread-1 2022-02-04 12:09:56,814 pause waiting
Thread-1 2022-02-04 12:09:56,814 resume waiting
Thread-1 2022-02-04 12:09:56,814 waiting ...
Thread-1 2022-02-04 12:10:01,815 pause waiting
Thread-1 2022-02-04 12:10:01,815 resume waiting
Thread-1 2022-02-04 12:10:01,815 waiting ...
Thread-2 2022-02-04 12:10:01,828 --> cancel waiting
Thread-1 2022-02-04 12:10:06,815 pause waiting
Thread-1 2022-02-04 12:10:06,815 stop waiting <--
The cancellation will be taken into account in the worse case RECEIPT_WAITING_TIME seconds later. You can lower this value if you desire, but because the loop has overhead, may be not too much low (sub-seconds).
It is equivalent to instead calling web3.eth.Eth.get_transaction_receipt which would throw a web3.exceptions.TransactionNotFound, then check the global, and try again.
class FakeWeb3ExceptionTransactionNotFound(Exception): pass


def fake_get_transaction_receipt():
    logger.info("get ...")
    time.sleep(0.2)  # takes a bit of time
    raise FakeWeb3ExceptionTransactionNotFound()  # never found


def thread_1_func_v2():
    while True:
        try:
            result = fake_get_transaction_receipt()
        except FakeWeb3ExceptionTransactionNotFound:
            logger.info("not found")
            global THREAD1_SHOULD_STOP_WAITING
            if THREAD1_SHOULD_STOP_WAITING:
                logger.info("stop waiting <--")
                break
            else:
                logger.info("resume waiting")
        else:
            logger.info("finish waiting")
            # do something with the result
            return  # don't sys.exit here


def main2():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    thread1 = threading.Thread(target=thread_1_func_v2, args=())
    thread1.start()
    thread2 = threading.Thread(target=thread_2_func, args=())
    thread2.start()

    thread1.join()
    thread2.join()
    sys.exit(0)  # exit here

main2()
Thread-1 2022-02-04 12:19:59,014 get ...
Thread-1 2022-02-04 12:19:59,214 not found
Thread-1 2022-02-04 12:19:59,214 resume waiting
Thread-1 2022-02-04 12:19:59,214 get ...
Thread-1 2022-02-04 12:19:59,415 not found
Thread-1 2022-02-04 12:19:59,415 resume waiting
Thread-1 2022-02-04 12:19:59,415 get ...
[...]
Thread-1 2022-02-04 12:20:13,461 get ...
Thread-1 2022-02-04 12:20:13,661 not found
Thread-1 2022-02-04 12:20:13,661 resume waiting
Thread-1 2022-02-04 12:20:13,661 get ...
Thread-1 2022-02-04 12:20:13,862 not found
Thread-1 2022-02-04 12:20:13,862 resume waiting
Thread-1 2022-02-04 12:20:13,862 get ...
Thread-2 2022-02-04 12:20:14,029 --> cancel waiting
Thread-1 2022-02-04 12:20:14,062 not found
Thread-1 2022-02-04 12:20:14,063 stop waiting <--

If you have better answer, please add a comment about this, thank you!

Source: Stackoverflow.com