Python's multiprocessing en geheugen

Ik gebruik multiprocessing.imap_unordered om een ​​berekening uit te voeren op een lijst met waarden:

def process_parallel(fnc, some_list):
    pool = multiprocessing.Pool()
    for result in pool.imap_unordered(fnc, some_list):
        for x in result:
            yield x
    pool.terminate()

Elke aanroep naar fnc retourneert een ENORP-object als resultaat, per ontwerp. Ik kan N-instanties van een dergelijk object opslaan in RAM, waarbij N ~ cpu_count, maar niet veel meer (niet honderden).

Het gebruik van deze functie kost nu te veel geheugen. Het geheugen wordt volledig besteed in het hoofdproces, niet in de werkers.

Hoe slaan imap_unordered de voltooide resultaten op? Ik bedoel de resultaten die al door de werknemers zijn teruggestuurd maar nog niet zijn doorgegeven aan de gebruiker. Ik vond het slim en berekende ze alleen 'lui' zoals nodig, maar blijkbaar niet.

Het lijkt erop dat, omdat ik de resultaten van process_parallel niet snel genoeg kan gebruiken, de pool deze grote objecten uit de wachtrij plaatst van fnc ergens, intern, en dan opblaast. Is er een manier om dit te voorkomen? Zijn interne wachtrij op de een of andere manier beperken?


Ik gebruik Python2.7. Proost.

19
@FelixBonkoski Nee, fnc neemt een enkel item van some_list en berekent en retourneert er een enorm object van.
toegevoegd de auteur user124114, de bron
Goed van wat ik zie opbrengst is in het hoofdproces, niet in fnc (dwz de functie die door de werknemers wordt uitgevoerd). is fnc zelf bezig met luie evaluatie?
toegevoegd de auteur Felix, de bron
Beoordeel alleen de limiet op basis van het beschikbare geheugen.
toegevoegd de auteur Eric des Courtis, de bron

2 antwoord

Zoals u kunt zien aan de hand van het bijbehorende bronbestand ( python2.7/multiprocessing/pool.py ), gebruikt de IMapUnorderedIterator een collections.deque -instantie voor het opslaan van de resultaten. Als een nieuw item binnenkomt, wordt het toegevoegd en verwijderd in de iteratie.

Zoals je al zei, als er nog een enorm object binnenkomt terwijl de hoofdthread nog steeds het object verwerkt, worden die ook in het geheugen opgeslagen.

Wat je zou kunnen proberen, is zoiets als dit:

it = pool.imap_unordered(fnc, some_list)
for result in it:
    it._cond.acquire()
    for x in result:
        yield x
    it._cond.release()

Dit zou ervoor moeten zorgen dat de taak-resultaat-ontvanger-thread wordt geblokkeerd terwijl je een item verwerkt als het probeert om het volgende object in de deque te plaatsen. Er mogen dus niet meer dan twee van de enorme objecten in het geheugen zijn. Als dat voor uw geval werkt, weet ik het niet;)

10
toegevoegd
Ik volg dit voorbeeld niet, is it niet zomaar een generator en heeft daarom geen _cond.acquire() en release methoden? Als u ze zelf moet schrijven, wat voor soort object moet ._ cond dan zijn?
toegevoegd de auteur Hooked, de bron
@Hooked: imap_unordered retourneert een IMapUnorderedIterator , die deze functies heeft zoals kan worden gezien door een blik in de bijbehorende broncode. Omdat de resultaat-ontvanger-thread (na ontvangst van een resultaat) het slot nodig heeft om het resultaat in de deque in te voeren, blokkeert dit de thread en voorkomt deze meer geheugen.
toegevoegd de auteur rumpel, de bron
@EricdesCourtis: de beperking lijkt niet het aantal uitgevoerde taken dat al door pool wordt beheerd, maar de resultaatgrootte die de hoofdthread niet tegelijkertijd in het geheugen kan bevatten.
toegevoegd de auteur rumpel, de bron
@EricdesCourtis: Ik weet niet zeker of ik je volledig volg. Kan het zijn dat je de hoofddraad verwart met de resultaat-ontvangende thread? De laatste is de relevante die moeilijker te controleren is omdat deze voor de gebruiker nogal transparant is.
toegevoegd de auteur rumpel, de bron
Klinkt als de gebruiker om prestaties geeft, waarom zou je hem beperken tot een klein aantal met een eenvoudig slot?
toegevoegd de auteur Eric des Courtis, de bron
Ja, ik begreep dat dat is waarom ik zei dat een semafoor gedeclareerd vanuit de hoofddraad een betere keuze zou zijn omdat je meer algemeen systeemgebruik zou krijgen zonder het hele geheugen opgebruikt te hebben.
toegevoegd de auteur Eric des Courtis, de bron

De eenvoudigste oplossing die ik kan bedenken is om een ​​afsluiting toe te voegen om je fnc -functie in te pakken die een semafoor zou gebruiken om het totale aantal gelijktijdige uitvoeringen van taken dat tegelijk kan worden uitgevoerd te regelen (ik neem aan dat proces/thread zou de semafoor verhogen). De semafoorwaarde kan worden berekend op basis van de taakgrootte en het beschikbare geheugen.

2
toegevoegd