'''
Provides a two-dimensional circular buffer with homogeneous elements
See other classes' docstrings for more information:
* `RingBuffer`: the buffer
* `datatypes`: supported datatypes
* `BufferHeader`: header structure
* `BufferError`: error definition
'''
from multiprocessing import Array
import ctypes as c
import logging
import numpy as np
__author__ = "Dmytro Bielievtsov"
__email__ = "belevtsoff@gmail.com"
[docs]class RingBuffer(object):
'''
Provides a two-dimensional circular buffer with homogeneous elements
The buffer can be used simultaneously by several processes, because
both data and metadata are stored in a single sharectypes byte array.
First, the buffer object is created and initialized in one of the
processes. Second, its raw array is shared with others. Third, those
processes create their own RingBuffer objects and initialize them
so that they all point to the same shared raw array
(see initialize_from_raw method's docstring).
Attributes
----------
is_initialized
nChannels
bufSize
pocketSize
nptype
raw
writePtr
See Also
--------
initialize: allocate new buffer
initialize_from_raw: use another buffer's raw array
Notes
-----
The buffer consists of a buffer interface (self) and the raw
sharectypes byte array which has header, data, and pocket subsections
(self.raw).
1. header section
Contains the metadata such as size of the sections, current write
pointer, datatype, number of channels (number of columns) and total
number of samples (not bytes) written
2. data section
Contains the actual data in the buffer. When the write pointer reaches
the end of the section it jumps to the beginning overwriting the old
data.
3. pocket section
The pocket section always contains the same data as the leftmost such
chunk of the data section. This is done to avoid data copies when
reading a data chunk (up to the size of the pocket) first part of
which happens to be located at the end of the data section while the
second - already in the beginning. This might be useful when reading
the data with a sliding window.
'''
def __init__(self):
self.logger = logging.getLogger('ringbuffer')
self.__initialized = False
def __str__(self):
'''
__str__ overload for printing buffer contents
'''
return self.__buf[:self.bufSize].T.__str__() + '\n' + self.__pocket.T.__str__()
def __getattr__(self, name):
'''
__getattr__ overload to avoid accessing buffer attributes before
it's initialized
Parameters
----------
name : string
name of the attribute
Returns
-------
requested attribute, if possible
Raises
------
BufferError
If the buffer is not initialized
'''
init = self.__initialized
if init:
return object.__getattribute__(self, name)
else:
raise BufferError(1)
#------------------------------------------------------------------------------
# Properties
# number of written samples
def __get_nsamples(self):
return self.__hdr.nSamplesWritten
def __set_nsamples(self, value):
self.__hdr.nSamplesWritten = value
nSamplesWritten = property(__get_nsamples, __set_nsamples)
# read-only attributes
writePtr = property(lambda self: self.nSamplesWritten % self.bufSize, None, None,
'Current write pointer position, read-only (int)')
is_initialized = property(lambda self: self.__initialized, None, None,
'Indicates whether the buffer is initialized, read-only (bool)')
raw = property(lambda self: self.__raw, None, None,
'Raw buffer array, read-only (sharedctypes, char)')
nChannels = property(lambda self: self.__nChannels, None, None,
'Dimensionality of a sample, read-only (int)')
bufSize = property(lambda self: self.__bufSize, None, None,
'The buffer capacity in samples, read-only (int)')
pocketSize = property(lambda self: self.__pocketSize, None, None,
'Size of the buffer pocket in samples, read-only (int)')
nptype = property(lambda self: self.__nptype, None, None,
'The type of the data in the buffer, read-only (string)')
#------------------------------------------------------------------------------
[docs] def initialize(self, nChannels, nSamples, windowSize=1, nptype='float32'):
'''
Initializes the buffer with a new raw array
Parameters
----------
nChannels : int
dimensionality of a single sample
nSamples : int
the buffer capacity in samples
windowSize : int, optional
optional, the size of the window to be used for reading the
data. The pocket of the this size will be created
nptype : string, optional
the type of the data to be stored
'''
self.__initialized = True
# checking parameters
if nChannels < 1:
self.logger.warning('nChannels must be a positive integer, setting to 1')
nChannels = 1
if nSamples < 1:
self.logger.warning('nSamples must be a positive integer, setting to 1')
nSamples = 1
if windowSize < 1:
self.logger.warning('wondowSize must be a positive integer, setting to 1')
windowSize = 1
# initializing
sizeBytes = c.sizeof(BufferHeader) + \
(nSamples + windowSize) * nChannels * np.dtype(nptype).itemsize
raw = Array('c', sizeBytes)
hdr = BufferHeader.from_buffer(raw.get_obj())
hdr.bufSizeBytes = nSamples * nChannels * np.dtype(nptype).itemsize
hdr.pocketSizeBytes = windowSize * nChannels * np.dtype(nptype).itemsize
hdr.dataType = datatypes.get_code(nptype)
hdr.nChannels = nChannels
hdr.nSamplesWritten = 0
self.initialize_from_raw(raw.get_obj())
[docs] def initialize_from_raw(self, raw):
'''
Initializes the buffer with the compatible external raw array. All
the metadata will be read from the header region of this array.
Parameters
----------
raw : sharedctypes char array
the raw array to initialize with
'''
self.__initialized = True
hdr = BufferHeader.from_buffer(raw)
# datatype
nptype = datatypes.get_type(hdr.dataType)
bufOffset = c.sizeof(hdr)
pocketOffset = bufOffset + hdr.bufSizeBytes
bufSizeFlat = hdr.bufSizeBytes / np.dtype(nptype).itemsize
pocketSizeFlat = hdr.pocketSizeBytes / np.dtype(nptype).itemsize
# create numpy view objects pointing to the raw array
self.__raw = raw
self.__hdr = hdr
self.__buf = np.frombuffer(raw, nptype, bufSizeFlat, bufOffset)\
.reshape((-1, hdr.nChannels))
self.__pocket = np.frombuffer(raw, nptype, pocketSizeFlat, pocketOffset)\
.reshape((-1, hdr.nChannels))
# helper variables
self.__nChannels = hdr.nChannels
self.__bufSize = len(self.__buf)
self.__pocketSize = len(self.__pocket)
self.__nptype = nptype
def __get_local_idx(self, startIdx, endIdx, nocheck=False):
'''
Checks for availability of requested chuck and returns local
indices if the requested chunk is non-contiguous, uses a pocket,
if the pocket is too small uses slow (copy) mode and issues a
corresponding warning
Parameters
----------
startIdx : int
chunk start index (in samples)
endIdx : int
chunk end index (in samples)
nocheck : bool, optional
whether to check availability
Returns
-------
idx: slice tuple or list of indices depending on the mode
Raises
------
BufferError
If requested samples are not available and `nocheck` is set to
True
'''
# availability check
e = self.check_availablility(startIdx, endIdx)
if e and not nocheck: raise BufferError(e)
chunkSize = endIdx - startIdx
localStartIdx = startIdx % self.bufSize
localEndIdx = endIdx % self.bufSize
# whole buffer
if localStartIdx == localEndIdx == 0:
return localStartIdx, self.bufSize
# contiguous chunk
if (localEndIdx - localStartIdx) > 0:
return localStartIdx, localEndIdx
# split chunk
else:
# can't use pocket
if chunkSize > self.pocketSize:
self.logger.warning('buffer: slow mode')
idxList = range(localStartIdx, self.bufSize)
idxList.extend(range(localEndIdx))
return idxList
# using pocket
else:
if chunkSize != self.pocketSize:
self.logger.info('buffer: buffer pocket is larger than the window size')
return localStartIdx, self.bufSize + localEndIdx
def __write_buffer(self, data, idx):
'''
Writes data to buffer.
Parameters
----------
data : ndarray
properly shaped numpy array
idx : tuple or list
local indices, returned by the __get_local_idx method
'''
# if the slicing (contiguous chunk)
if len(idx) == 2 and (idx[1] - idx[0]) > 0:
i, j = idx
self.__buf[i:j] = data
# copying needed parts to/from the pocket
if i < self.bufSize <= j:
self.__buf[:j - self.bufSize] = self.__pocket[:j - self.bufSize].copy()
elif i < j <= self.pocketSize:
self.__pocket[i:j] = self.__buf[i:j].copy()
elif i < self.pocketSize <= j:
self.__pocket[i:] = self.__buf[i:self.pocketSize].copy()
# if the advanced indexing is used (the pocket is too small)
else:
i, j = min(idx), max(idx)
self.__buf[idx] = data
self.__pocket[:] = self.__buf[:self.pocketSize]
def __read_buffer(self, idx):
'''
Reads the data from buffer
Parameters
----------
idx : tuple or list
local indices, returned by the __get_local_idx method
Returns
-------
numpy view on the requested chunk (ndarray)
'''
if len(idx) == 2 and (idx[1] - idx[0]) > 0:
i, j = idx
return self.__buf[i:j]
else:
return self.__buf[idx]
[docs] def check_availablility(self, sampleStart, sampleEnd):
'''
Checks whether the requested data samples are available.
Parameters
----------
sampleStart : int
first sample index (included)
sampleEnd : int
last samples index (excluded)
Returns
-------
0
if the data is available
2
if (part of) the data is already overwritten
3
if (part of) the data is not yet in the buffer
'''
if sampleStart < 0 or sampleEnd <= 0:
return 5
if sampleEnd > self.nSamplesWritten:
return 3 # data is not ready
if (self.nSamplesWritten - sampleStart) > self.bufSize:
return 2 # data is already erased
return 0
[docs] def get_data(self, sampleStart, sampleEnd, wprotect=True):
'''
Gets the data from the buffer. If possible, the data is returned
in the form of a numpy view on the corresponding chunk (without
copy). If the data is not available, rises an exception
Parameters
----------
sampleStart : int
first sample index (included)
sampleEnd : int
last samples index (excluded)
wprotect : bool, optional
protect returned views from occasional writes
Returns
-------
data : data chunk (numpy view or numpy array)
Raises
------
BufferError
If the data is not available
'''
idx = self.__get_local_idx(sampleStart, sampleEnd)
data = self.__read_buffer(idx)
data.setflags(write=not wprotect)
return data
[docs] def put_data(self, data):
'''
Pushes the data to the buffer
Parameters
----------
data : ndarray
properly shaped numpy array
'''
datashape = data.shape
if len(datashape) != 1:
if (data.shape[1] != self.nChannels):
raise BufferError(4)
else:
datashape = (len(data), 1)
if self.nChannels != 1:
raise BufferError(4)
sampleEnd = self.nSamplesWritten + len(data)
sampleStart = (len(data) > self.bufSize) and (sampleEnd - self.bufSize) or self.nSamplesWritten
idx = self.__get_local_idx(sampleStart, sampleEnd, nocheck=True)
self.__write_buffer(data.reshape(datashape)[sampleStart - sampleEnd :], idx)
import time; time.sleep(0.001)
self.nSamplesWritten += len(data)
[docs]class datatypes():
'''
A helper class to interpret the typecode read from buffer header.
To add new supported datatypes, add them to the 'type' dictionary
'''
types = {0:'float32',
1:'int16'}
@classmethod
[docs] def get_code(cls, type):
'''
Gets buffer typecode given numpy datatype
Parameters
----------
type : string
numpy datatype (e.g. 'float32')
'''
idx = cls.types.values().index(type)
return cls.types.keys()[idx]
@classmethod
[docs] def get_type(cls, code):
'''
Gets numpy datatype given a buffer typecode
Parameters
----------
code : int
typecode (e.g. 0)
'''
return cls.types[code]
[docs]class BufferError(Exception):
'''
Represents different types of buffer errors
'''
def __init__(self, code):
'''
Initializes a BufferError with given error code
Parameters
----------
code : int
error code
'''
self.code = code
def __str__(self):
'''
Prints the error
'''
if self.code == 1:
return 'buffer is not initialized (error %s)' % repr(self.code)
elif self.code in [2, 3]:
return 'unable to get indices (error %s)' % repr(self.code)
elif self.code == 4:
return 'writing incompatible data (error %s)' % repr(self.code)
elif self.code == 5:
return 'negative index (error %s)' % repr(self.code)
else:
return '(error %s)' % repr(self.code)
if __name__ == '__main__':
buf1 = RingBuffer()
buf2 = RingBuffer()
# buf1.initialize(2, 10, 3)
buf1.initialize(2, 15, 3)
buf2.initialize_from_raw(buf1.raw)
buf1.put_data(np.array([[1, 2], [3, 4]]))
buf2.put_data(np.array([[5, 6], [7, 8]]))
print buf1
print buf2
dat = buf2.get_data(1, 4)
dat[0, 0] = 100
print buf1
print buf2