programing

멀티프로세서 풀과 유사한 스레드 풀?

itsource 2022. 10. 25. 21:54
반응형

멀티프로세서 풀과 유사한 스레드 풀?

멀티프로세서 모듈의 Pool 클래스와 유사한 워커 스레드용 Pool 클래스가 있습니까?

예를 들어 지도 기능을 병렬화하는 쉬운 방법을 좋아합니다.

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

하지만 새로운 프로세스를 만드는 데 드는 오버헤드 없이 하고 싶습니다.

난 GIL에 대해 알아.단, 제 사용 예에서는 함수는 실제 함수 호출 전에 python 래퍼에서 GIL을 해제하는 IO-bound C 함수입니다.

스레드 풀을 직접 작성해야 합니까?

실제로 스레드 기반 풀인터페이스가 있는 것을 방금 알게 되었습니다.multiprocessing모듈. 단, 어느 정도 숨겨져 있어 적절히 문서화되어 있지 않습니다.

를 통해 Import할 수 있습니다.

from multiprocessing.pool import ThreadPool

파이썬 스레드를 래핑하는 더미 프로세스 클래스를 사용하여 구현됩니다.이 스레드 기반의 프로세스클래스는, 에서 간단하게 설명되고 있습니다.이 더미 모듈은 스레드에 기반한 전체 멀티프로세싱 인터페이스를 제공합니다.

Python 3에서는 다음을 사용할 수 있습니다.

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

상세한 것에 대하여는, 문서를 참조해 주세요.

네, API도 거의 같은 것 같습니다.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

매우 심플하고 가벼운 것(여기서 약간 변경):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

작업 완료 시 콜백을 지원하려면 작업 태플에 콜백을 추가하면 됩니다.

Python에서 스레드 풀을 사용하려면 다음 라이브러리를 사용할 수 있습니다.

from multiprocessing.dummy import Pool as ThreadPool

그리고 이 라이브러리는 다음과 같이 사용합니다.

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

스레드는 원하는 스레드 수입니다. 태스크는 서비스에 가장 많이 매핑되는 태스크 목록입니다.

이게 내가 마침내 사용하게 된 결과야.위 dgorissen에 의해 수정된 수업입니다.

파일:threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

풀을 사용하려면

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

예, 멀티프로세서 풀과 유사한 스레드 풀이 있지만 어느 정도 숨겨져 있고 적절하게 문서화되어 있지 않습니다.다음의 방법으로 Import 할 수 있습니다.

from multiprocessing.pool import ThreadPool

간단한 예를 보여드리겠습니다.

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 

스레드 큐 풀에 프로세스를 추가하는 방법도 있습니다.

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

새로운 프로세스를 생성하는 데 드는 오버헤드는 최소이며, 특히 프로세스 중 4개만 사용하는 경우에는 최소입니다.이것이 당신의 어플리케이션의 퍼포먼스 핫 스팟이라고는 생각되지 않습니다.심플함을 유지하면서 프로파일링 결과가 가리키는 위치와 위치를 최적화합니다.

을 사용법 「」/「」, 「」, 「」, 「」, 「」의 제조자/는 것은 수 .Queue를 누릅니다

송신원: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

다른 사람의 코드를 실행해도 괜찮으시다면, 여기 제 코드가 있습니다.

주의: 삭제해야 할 추가 코드가 많이 있습니다.[명확한 설명과 동작의 데모를 위해 추가]

주의: 메서드 이름 및 변수 이름에는 camelCase 대신 Python 명명 규칙이 사용되었습니다.

작업 절차:

  1. 멀티스레드 클래스는 잠금, 작업 큐, 종료 플래그 및 결과를 공유함으로써 스레드 인스턴스 없이 시작됩니다.
  2. 싱글 스레드는 모든 인스턴스를 생성하면 멀티 스레드에 의해 시작됩니다.
  3. Multi Thread를 사용하여 작품을 추가할 수 있습니다(잠금 처리).
  4. SingleThreads는 가운데 잠금을 사용하여 작업 큐를 처리합니다.
  5. 작업이 완료되면 공유 부울 값을 가진 모든 스레드를 파기할 수 있습니다.
  6. 여기, 일은 무엇이든 될 수 있다.지정된 인수를 사용하여 모듈을 자동으로 Import(비주석 Import 행)하여 처리할 수 있습니다.
  7. 결과가 결과에 추가되고 get_results를 사용할 수 있습니다.

코드:

import threading
import queue


class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results


class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs


class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)


class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())

언급URL : https://stackoverflow.com/questions/3033952/threading-pool-similar-to-the-multiprocessing-pool

반응형