Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Python class missing attribute that was initialized?

I am trying to create a class that enables counting based off of the multiprocessing Queue:

import multiprocessing
from multiprocessing import Value
from multiprocessing.queues import Queue

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = Value('i', n)

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value


class CounterQueue(Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        self.size = SharedCounter(0)
        super(CounterQueue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(CounterQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)
        return super(CounterQueue, self).get(*args, **kwargs)

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self.size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

    def clear(self):
        """ Remove all elements from the Queue. """
        while not self.empty():
            self.get()

However, it seems like when I try and pass this object as an argument into another process,

    for i in range(len(multiples)):
        res_queues.append(CounterQueue())
    
    process = mp.Process(name="test", 
                                    target=function,
                                      args=(res_queues))
    
    process.daemon = True

    process.start()

I get an AttributeError when calling put: AttributeError: 'CounterQueue' object has no attribute 'size'. However, I’ve confirmed the code is correct since the following code executes without issue:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>>> from python.multiprocessing.queue import CounterQueue
>>> a = CounterQueue()
>>> a.put(1)
>>> a.qsize()
1

I’m wondering if I’m missing something with respect to Python specifics here?

>Solution :

I will reason about it, and not sure if I can get you an answer:
what is likely happening is that in order for multiprocessing.Queue() to work properly – i.e., act as a Queue endpoint when passed to another process, it has to customize its serialization/de-serialization. And then, what you get there is that the de-serialization of a mp.Queue subclass won’t preserve the instance attributes of that class.

(Fact – here is the code for mp.Queue __getstate__:


File ~/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/queues.py:58, in Queue.__getstate__(self)
     57 def __getstate__(self):
---> 58     context.assert_spawning(self)
     59     return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
     60             self._rlock, self._wlock, self._sem, self._opid)

(sorry, I picked it from a stacktrace, hence the noise) – but as you can see, it just serializaes that fixed set of attributes.

I guess the most transparent workaround for your case is, instead of inheritance, use association: your queue class should "have" a multiprocessing.Queue as an attribute, as a sibling to the .size attribute, and then you have to proxy all Queue methods from your class to the underlying one.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading