Source code for rdaclient

'''
RDA client classes. See rdaclient.Client's docstring for more information
'''

from multiprocessing import Process, Queue
from collections import deque
import signal
import ctypes as c
import socket
import logging
import time

import numpy as np

import rdadefs
import rdatools
import ringbuffer

__author__ = "Dmytro Bielievtsov"
__email__ = "belevtsoff@gmail.com"

[docs]class Client(object): ''' An asynchronous RDA (Remote Data Access) client with buffer. Spawns a child process for storing constantly incoming data in the background. Currently supports only float32 data type Parameters ---------- buffer_size : int, optional buffer capacity (in samples) buffer_window : int, optional buffer pocket size (in samples) Attributes ---------- is_streaming buffer_size data_dtype buffer_window start_msg : None or rda_msg_start_full_t a start message obtained from the server after the first start_streaming() call. Notes ----- The RDA data sharing is used by the BrainVision software. ''' def __init__(self, buffer_size=300000, buffer_window=1): self.logger = logging.getLogger('rdaclient') self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__buf = ringbuffer.RingBuffer() self.__buffer_size = buffer_size self.__data_dtype = 'float32' # for now self.__buffer_window = buffer_window self.__streamer = None self.q = Queue() self.start_msg = None def __get_is_streaming(self): try: return self.__streamer.is_alive() except: return False is_streaming = property(__get_is_streaming, None, None, 'Checks whether the Streamer is active, read-only (bool)') buffer_size = property(lambda self: self.__buffer_size, None, None, 'Buffer capacity in samples, read-only (int)') data_dtype = property(lambda self: self.__data_dtype, None, None, 'Buffer\'s data type, read-only (string)') buffer_window = property(lambda self: self.__buffer_window, None, None, 'Buffer pocket size, read-only (in samples)') last_sample = property(lambda self: self.__buf.nSamplesWritten, None, None, 'Number of a last sample written to the buffer\ (= total no.)')
[docs] def connect(self, destaddr): ''' Connects to an RDA server Parameters ---------- destaddr : tuple server address ''' self.sock.connect(destaddr) self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
[docs] def start_streaming(self, timeout=10): ''' Starts data streaming from the server, using the following algorithm: 1. Waits until start/data message arrives or timeout is over 2. If start message arrived, initializes buffer 3. Spawns a background process for data streaming 4. Releases Parameters ---------- timeout : float, optional time to wait for a start message (in seconds) ''' if self.is_streaming: raise Exception('already streaming') self.logger.info('waiting for an rda start message...') hdr = rdadefs.rda_msg_hdr_t() then = time.time() now = time.time() while now - then < timeout: n = self.sock.recv_into(hdr) rdatools.check_received(n, hdr) if not rdatools.validate_rda_guid(hdr): self.logger.warning('packet with unknown GUID reveived') if hdr.nType == rdadefs.RDA_START_MSG: self.start_msg = rdatools.rda_read_start_msg(self.sock, hdr) self.logger.info('start message received, ' + \ rdatools.startmsg2string(self.start_msg)) break elif hdr.nType == rdadefs.RDA_FLOAT_MSG and self.start_msg: self.sock.recv(hdr.nSize - c.sizeof(hdr)) self.logger.info('trying to resume previous session...') break else: self.sock.recv(hdr.nSize - c.sizeof(hdr)) self.logger.info('skipped package (type = %s)' % hdr.nType) now = time.time() if not self.__buf.is_initialized: self.logger.info('initializing buffer...') self.__buf.initialize(int(self.start_msg.nChannels), self.buffer_size, self.buffer_window, self.data_dtype) self.logger.info('spawning a streamer process...') self.__streamer = Streamer(self.q, self.sock.fileno(), self.__buf.raw) self.__streamer._daemonic = True self.__streamer.start()
[docs] def stop_streaming(self, write_timelog=False): ''' Stops streaming by sending corresponding signal to a Streamer process Parameters ---------- write_timelog : bool, optional If True, streamer will write its timelog to a file before stopping ''' if not self.is_streaming: raise Exception('already stopped') self.q.put('stop') if write_timelog: self.q.put('save_timelog') self.__streamer.join() self.logger.info('stopped streaming')
[docs] def disconnect(self): ''' Disconnects the client from a server ''' self.sock.close()
[docs] def get_data(self, sampleStart, sampleEnd): ''' Gets the data from the buffer. If possible, the data is returned in the form of a numpy view on the corresponding chunk (without copy) Parameters ---------- sampleStart : int first sample index (included) sampleEnd : int last samples index (excluded) Returns ------- data : ndarray (view or copy) or None data chunk or None, if the data is not available ''' try: return self.__buf.get_data(sampleStart, sampleEnd) except: return None
[docs] def wait(self, sampleStart, sampleEnd, timeout=1, sleep=5e-4): ''' Gets the data from the buffer. Blocks if data is not available and releases if one of the following is true: 1. data is available 2. timeout is over 3. data is overwritten Parameters ---------- sampleStart : int first sample index (included) sampleEnd : int last samples index (excluded) timeout : float, optional timeout (seconds) sleep : float, optional time to wait until the next loop iteration. Used to avoid 100% processor loading. Returns ------- data : ndarray (view or copy) or None data chunk or None, if the data is overwritten or the timeout has expired ''' if not self.is_streaming: raise Exception('nothing to wait, start streaming first') then = time.time() now = time.time() while now - then < timeout: try: return self.__buf.get_data(sampleStart, sampleEnd) except ringbuffer.BufferError as e: if e.code != 3: # if the data is overwritten return None time.sleep(sleep) now = time.time() return None
[docs] def poll(self, nSamples, timeout=10, sleep=0.0005): ''' Gets the most resent data chunk from the buffer. Blocks until the next data block is written to the buffer or timeout is over. Parameters ---------- nSamples : int chunk size (in samples) timeout : float timeout (seconds) sleep : float time to wait until the next loop iteration. Used to avoid 100% processor loading. Returns ------- data : ndarray (view or copy) or None data chunk or None, if the data is overwritten or the timeout has expired ''' if not self.is_streaming: raise Exception('nothing to wait, start streaming first') ls = self.last_sample if self.wait(ls, ls + 1, timeout, sleep) is not None: ls = self.last_sample return self.get_data(ls - nSamples, ls) return None #------------------------------------------------------------------------------
[docs]class Streamer(Process): ''' A Streamer class. Inherited from the `multiprocessing.Process`. It is spawned by a Client to work in the background and receive the data. The buffer interface is initialized with a provided raw sharedctypes buffer array. Parameters ---------- q : Queue Queue object for interprocess communication fd : int socket file descriptor (the one which is connected to a server) raw : sharectypes char array: a raw sharedctypes buffer array. ''' def __init__(self, q, fd, raw): self.logger = logging.getLogger('data_streamer') self.sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) self.__buf = ringbuffer.RingBuffer() self.__buf.initialize_from_raw(raw) self.q = q self.timelog = deque(maxlen=100000) self.timelog_fname = 'streamer_timelog' # dictionary of known commands self.cmds = {'save_timelog' : self.__save_timelog} super(Streamer, self).__init__()
[docs] def run(self): ''' The main streaming loop. ''' cmd = self.__get_cmd() hdr = rdadefs.rda_msg_hdr_t() self.logger.info('started streaming') # Ignore Ctrl+C. The process is run in the daemonic mode, so if the # Client terminates, the streamer will be terminated anyway. This # ignoring however, allows for custom Ctrl+C handling in the # Client process to gracefully stop both the Client and the Streamer signal.signal(signal.SIGINT, signal.SIG_IGN) # stream until there's a stop command while cmd != 'stop': n = self.sock.recv_into(hdr) rdatools.check_received(n, hdr) # check for a proper packet ID if not rdatools.validate_rda_guid(hdr): self.logger.warning('packet with unknown GUID reveived') if hdr.nType == rdadefs.RDA_FLOAT_MSG: msg = rdatools.rda_read_data_msg(self.sock, hdr, self.__buf.nChannels) self.__put_datablock(msg) # skip the weird undocumented package elif hdr.nType == 10000: self.sock.recv(hdr.nSize - c.sizeof(hdr)) elif hdr.nType == rdadefs.RDA_STOP_MSG: self.logger.info('stop message received, stopping...') self.q.put('stop') time.sleep(.5) else: self.sock.recv(hdr.nSize - c.sizeof(hdr)) self.logger.info('skipped package (type = %s)' % hdr.nType) cmd = self.__get_cmd() self.logger.info('stopped streaming') # execute the last command before exiting cmd = self.__get_cmd() self.__execute_cmd(cmd)
def __put_datablock(self, msg): ''' Reshapes the data chunk and pushes it to the buffer Parameters ---------- msg : rda_msg_data_t data message ''' data = np.frombuffer(msg.fData, 'float32') self.__buf.put_data(np.reshape(data, (-1, self.__buf.nChannels))) self.timelog.append(time.time()) self.logger.debug('put data: rda block #%s, %s samples, time: %.3f' % (msg.nBlock, msg.nPoints, self.timelog[-1])) def __get_cmd(self): ''' Gets the command from the queue Returns ------- cmd : string or None, if there was no command ''' try: cmd = self.q.get(False) return cmd except Exception: return None def __execute_cmd(self, cmd): ''' Executes the command, if it's known Parameters ---------- cmd : string command ''' if self.cmds.has_key(cmd): try: self.cmds[cmd]() except: self.logger.warning('unable to execute command %s' % cmd) def __save_timelog(self): ''' Saves the timelog to a file. The timelog contains data package arriving times. May be useful for debugging and network setup ''' np.save(self.timelog_fname, np.array(self.timelog)) #------------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format='[%(process)-5d:%(threadName)-10s] %(name)s: %(levelname)s: %(message)s') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) client = Client(buffer_size=300000, buffer_window=10) client.connect(('', 51244)) client.start_streaming() time.sleep(10) client.stop_streaming() client.disconnect()