B /g^tm@sdZdZddlZddlZddlmZddlZddlmZddlZ ddl m Z ddl m Z ddlZddlZddlmZddlZddlZddlZeZd aGd d d Zd d ZdZdZGdddeZGdddZddZGddde Z!Gddde Z"Gddde Z#Gddde Z$ddZ%d d!Z&d2d"d#Z'd$d%Z(d&d'Z)d(d)Z*d a+da,d*d+Z-d,d-Z.Gd.d/d/ej/Z0Gd0d1d1ej1Z2e3edS)3a* Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | | | | Call Q | | Process | | | +----------+ | | +-----------+ | Pool | | | | ... | | | | ... | +---------+ | | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com)N)_base)Full)wait)Queue)partialFc@s,eZdZddZddZddZddZd S) _ThreadWakeupcCstjdd\|_|_dS)NF)Zduplex)mpZPipe_reader_writer)selfr 2/usr/lib64/python3.7/concurrent/futures/process.py__init__Rsz_ThreadWakeup.__init__cCs|j|jdS)N)r closer )r r r r rUs z_ThreadWakeup.closecCs|jddS)N)r Z send_bytes)r r r r wakeupYsz_ThreadWakeup.wakeupcCsx|jr|jqWdS)N)r ZpollZ recv_bytes)r r r r clear\s z_ThreadWakeup.clearN)__name__ __module__ __qualname__rrrrr r r r rQsrcCsHdatt}x|D]\}}|qWx|D]\}}|q0WdS)NT)_global_shutdownlist_threads_wakeupsitemsrjoin)r_ thread_wakeuptr r r _python_exitas   r=c@seZdZddZddZdS)_RemoteTracebackcCs ||_dS)N)tb)r r"r r r rzsz_RemoteTraceback.__init__cCs|jS)N)r")r r r r __str__|sz_RemoteTraceback.__str__N)rrrrr#r r r r r!ysr!c@seZdZddZddZdS)_ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontyperexcr")r r)r"r r r rs z _ExceptionWithTraceback.__init__cCst|j|jffS)N) _rebuild_excr)r")r r r r __reduce__sz"_ExceptionWithTraceback.__reduce__N)rrrrr+r r r r r$sr$cCst||_|S)N)r! __cause__)r)r"r r r r*s r*c@seZdZddZdS) _WorkItemcCs||_||_||_||_dS)N)futurefnargskwargs)r r.r/r0r1r r r rsz_WorkItem.__init__N)rrrrr r r r r-sr-c@seZdZdddZdS) _ResultItemNcCs||_||_||_dS)N)work_id exceptionresult)r r3r4r5r r r rsz_ResultItem.__init__)NN)rrrrr r r r r2sr2c@seZdZddZdS) _CallItemcCs||_||_||_||_dS)N)r3r/r0r1)r r3r/r0r1r r r rsz_CallItem.__init__N)rrrrr r r r r6sr6cs.eZdZdZdfdd ZfddZZS) _SafeQueuez=Safe Queue set exception to the future object linked to a jobrcs||_tj||ddS)N)ctx)pending_work_itemssuperr)r max_sizer8r9) __class__r r rsz_SafeQueue.__init__cslt|trZtt|||j}tdd||_ |j |j d}|dk rh|j |nt||dS)Nz """ {}"""r%) isinstancer6r&r'r( __traceback__r!formatrr,r9popr3r. set_exceptionr:_on_queue_feeder_error)r eobjr" work_item)r<r r rBs z!_SafeQueue._on_queue_feeder_error)r)rrr__doc__rrB __classcell__r r )r<r r7sr7cgs0t|}x"tt||}|s"dS|Vq WdS)z, Iterates over zip()ed iterables in chunks. N)ziptuple itertoolsislice) chunksize iterablesitchunkr r r _get_chunkss rPcsfdd|DS)z Processes a chunk of an iterable passed to map. Runs the function passed to map() on a chunk of the iterable passed to map. This function is run in a separate process. csg|] }|qSr r ).0r0)r/r r sz"_process_chunk..r )r/rOr )r/r _process_chunks rSc Cs^y|t|||dWn@tk rX}z"t||j}|t||dWdd}~XYnXdS)z.Safely send back the given result or exception)r5r4)r4N)putr2 BaseExceptionr$r>) result_queuer3r5r4rCr)r r r _sendback_results   rWc Cs|dk r:y ||Wn$tk r8tjjddddSXx|jdd}|dkrb|tdSy|j|j |j }Wn>tk r}z t ||j }t ||j|dWdd}~XYnXt ||j|d~qrWr3) call_queuerV initializerinitargsZ call_itemrrCr)r r r _process_workers$    "racCsxxr|rdSy|jdd}Wntjk r4dSX||}|jrh|jt||j|j |j ddq||=qqWdS)aMFills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. NF)rYT) ZfullrZqueueZEmptyr.Zset_running_or_notify_cancelrTr6r/r0r1)r9Zwork_idsr]r3rEr r r _add_call_item_to_queues   rcc sRdfdd}fdd}|j} |j} | | g} xt||ddD} t| | } d}d}| | kry| }d }Wqtk r}ztt|||j }Wdd}~XYqXn| | krd }d}| |rt|dk rd _ d_ dt d }|dk r td d |d|_x$|D]\}}|j|~q*W| xD]}|qXW|dSt|tr|st|}|s|dSnL|dk r||jd}|dk r|jr|j|jn|j|j~~||rFy$dk rd_ |s,|dSWntk rDYnXdq6WdS)a,Manages the communication between this process and the worker processes. This function is run in a local thread. Args: executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. thread_wakeup: A _ThreadWakeup to allow waking up the queue_manager_thread from the main Thread and avoid deadlocks caused by permanently locked queues. NcstpdkpjS)N)r_shutdown_threadr )executorr r shutting_down?s z/_queue_management_worker..shutting_downc stddD}|}d}xn||kr|dkrxBt||D]2}yd|d7}Wq>tk rnPYq>Xq>WtddD}q WxD] }|qWdS)Ncss|]}|VqdS)N)is_alive)rQpr r r EszD_queue_management_worker..shutdown_worker..rrcss|]}|VqdS)N)rg)rQrhr r r riQs)sumvaluesrangeZ put_nowaitrrr)Zn_children_aliveZn_children_to_stopZn_sentinels_sentirh)r] processesr r shutdown_workerCs   z1_queue_management_worker..shutdown_workercSsg|] }|jqSr )sentinel)rQrhr r r rRhsz,_queue_management_worker..TFzKA child process terminated abruptly, the process pool is not usable anymorez^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r%z''')r rcrkrZrecvrUr&r'r(r>r_brokenrdBrokenProcessPoolr!rr,rr.rAZ terminater=intAssertionErrorr@r3r4Z set_resultr5r)Zexecutor_referencernr9Zwork_ids_queuer]rVrrfroZ result_readerZ wakeup_readerZreadersZworker_sentinelsZreadycauseZ is_brokenZ result_itemrCZbper3rErhr )r]rernr _queue_management_worker!s  (        rvc Cshtrtrttdaytd}Wnttfk r:dSX|dkrHdS|dkrTdSd|attdS)NTSC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorr[sysconfAttributeError ValueError)Z nsems_maxr r r _check_system_limitssrccs.x(|D] }|x|r$|VqWqWdS)z Specialized implementation of itertools.chain.from_iterable. Each item in *iterable* should be a list. This function is careful not to keep references to yielded objects. N)reverser@)iterableZelementr r r _chain_from_iterable_of_listss rc@seZdZdZdS)rrzy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N)rrrrFr r r r rrsrrcsheZdZdddZddZddZd d Zejjj e_ dd d fd d Z dddZ ejj j e _ Z S)ProcessPoolExecutorNr cCst|dkr6tpd|_tjdkrntt|j|_n8|dkrHtdn tjdkrh|tkrhtdt||_|dkr~t }||_ |dk rt |st d||_||_d|_i|_d|_t|_d|_d|_i|_|jt}t||j |jd |_d |j_||_t |_!t"|_#dS) aSInitializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. NrZwin32rz"max_workers must be greater than 0zmax_workers must be <= zinitializer must be a callableF)r;r8r9T)$rr[ cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrrZ get_context _mp_contextcallable TypeError _initializer _initargs_queue_management_thread _processesrd threadingZLock_shutdown_lockrq _queue_count_pending_work_itemsEXTRA_QUEUED_CALLSr7 _call_queueZ _ignore_epipeZ SimpleQueue _result_queuerbr _work_idsr_queue_management_thread_wakeup)r Z max_workersZ mp_contextr^r_Z queue_sizer r r rsF           zProcessPoolExecutor.__init__c Csv|jdkrr|jfdd}|tjtt|||j|j |j |j |j |jfdd|_d|j_ |j|jt|j<dS)NcSstjd|dS)Nz?Executor collected: triggering callback for QueueManager wakeup)rutildebugr)rrr r r weakref_cbAs zFProcessPoolExecutor._start_queue_management_thread..weakref_cbZQueueManagerThread)targetr0nameT)rr_adjust_process_countrZThreadrvweakrefrefrrrrrZdaemonstartr)r rr r r _start_queue_management_thread<s     z2ProcessPoolExecutor._start_queue_management_threadcCsTxNtt|j|jD]8}|jjt|j|j|j |j fd}| ||j|j <qWdS)N)rr0) rllenrrrZProcessrarrrrrpid)r rrhr r r rWs z)ProcessPoolExecutor._adjust_process_countc Ost|dkr|^}}}n>|s&tdn0d|krB|d}|^}}ntdt|d|j|jrnt|j|jr|tdtrtdt }t ||||}||j |j <|j|j |j d7_ |j||SQRXdS)NzEdescriptor 'submit' of 'ProcessPoolExecutor' object needs an argumentr/z6submit expected at least 1 positional argument, got %drz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdown)rrr@rrqrrrd RuntimeErrorrrZFuturer-rrrrTrrr)r0r1r r/fwr r r submitbs0        zProcessPoolExecutor.submitr)timeoutrLcs:|dkrtdtjtt|t|d|i|d}t|S)ajReturns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. rzchunksize must be >= 1.rL)r)rr:maprrSrPr)r r/rrLrMZresults)r<r r rs  zProcessPoolExecutor.mapTc Cs|j d|_WdQRX|jr6|j|r6|jd|_|jdk rd|j|r^|jd|_d|_ d|_ |jr|jd|_dS)NT) rrdrrrrrrZ join_threadrr)r rr r r shutdowns"      zProcessPoolExecutor.shutdown)NNNr )T) rrrrrrrrExecutorrFrrrGr r )r<r rs J !  r)NN)4rF __author__atexitr[Zconcurrent.futuresrrbrZmultiprocessingrZmultiprocessing.connectionrZmultiprocessing.queuesrrr functoolsrrJrr&WeakKeyDictionaryrrrrrr Exceptionr!r$r*objectr-r2r6r7rPrSrWrarcrvrzr{rrZBrokenExecutorrrrrregisterr r r r ,sV         (&! L