run crypto feed in a concurrent thread: There is no current event loop in thread ‘ThreadPoolExecutor-0_0’

I’m trying to use crypto feed to download data concurrently.

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

This code above can be run successfully. However, I am trying to run it in the background. So I am using concurrent futures to help.

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)

However, I got error:

job2.result()


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433 
    434             self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

Could anyone help me? Thanks so much!

Edit: following

def threadable():    
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()


executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()

I got the error: It seems I still got the same error about event loop… is it solvable?

RuntimeError                              Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
     11 job2.done()
     12 
---> 13 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

<ipython-input-47-05c023dd326f> in threadable()
      2     f = FeedHandler()
      3     f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4     f.run()
      5 
      6 

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.

Answer

In the single-threaded version of your code, all three of these statements execute in the same thread in a simple sequential fashion:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

In the multithreaded version, you submit only the last line to the Executor, and therefore it will run in a secondary thread. But these statements, as far as I can tell from the code you provided, still execute in the main thread:

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))

How do you know that will work? In general it would depend on the implementation details of Gateio and Feedhandler. You need to be very careful about chopping up a program into pieces to be run in different threads, especially when third-party library calls are involved. So, good luck with that.

You could try this:

def threadable():
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()

...

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)

Then, at least, your entire sequence of steps will execute in the SAME thread.

I would be worried about those callbacks, however. They will now run in the secondary thread, and you need to understand the consequences of that. Do they interact with a user interface program? Your UI may not support multithreading.

The use of the Executor protocol is a bit weird here, since your function doesn’t return a value. The Executors are most useful when they are used to aggregate returned values. You may be better off just launching the threads you need using methods in the threading module.