Source code for rdatools
'''
Some useful tools to work with BrainVision RDA (Remote Data Access) API
'''
from ctypes import *
import numpy as np
import rdadefs as rda
__author__ = "Dmytro Bielievtsov"
__email__ = "belevtsoff@gmail.com"
[docs]def rda_read_start_msg(s, hdr):
'''
Reads an RDA start message from socket given the header
Parameters
----------
s : socket
socket object
hdr : rda_msg_hdr_t
message header
Returns
-------
mgs : rda_msg_start_full_t
complete start message including variable fields
'''
# allocate space for the whole message
buf = (c_char * hdr.nSize)()
# create a view on the required part, no copies
msg_fixed = rda.rda_msg_start_t.from_buffer(buf)
msg_fixed.hdr = hdr
# another view, cuz there's no offset argument in rect_into()
rest = (c_char * (sizeof(msg_fixed) - sizeof(hdr)))\
.from_buffer(buf, sizeof(hdr))
n = s.recv_into(rest)
check_received(n, rest)
nChannels = msg_fixed.nChannels
stringLength = sizeof(buf) - sizeof(msg_fixed) - nChannels * sizeof(c_double)
# create new type including variable fields
rda_msg_start_full_t = rda.rda_msg_start_t.full(nChannels, stringLength)
# receive the rest
# this may be a large block, so the receive command may release
# before if received all the data. Using loop here until all
# the data is received
n = 0
while n < rda_msg_start_full_t.varLength:
msg_var = (c_char * (rda_msg_start_full_t.varLength - n)) \
.from_buffer(buf, sizeof(msg_fixed) + n)
n += s.recv_into(msg_var)
return rda_msg_start_full_t.from_buffer(buf)
[docs]def rda_read_data_msg(s, hdr, nChannels):
'''
Reads an RDA data message from socket given the header
Parameters
----------
s : socket
socket object
hdr : rda_msg_hdr_t
message header
nChannels : int
number of channels (from start message)
Returns
-------
msg : rda_msg_data_full_t
complete data message including variable fields
'''
# allocate space for the whole message
buf = (c_char * hdr.nSize)()
# create a view on the required part, no copies
msg_fixed = rda.rda_msg_data_t.from_buffer(buf)
msg_fixed.hdr = hdr
# another view, cuz there's no offset argument in rect_into()
rest = (c_char * (sizeof(msg_fixed) - sizeof(hdr)))\
.from_buffer(buf, sizeof(hdr))
n = s.recv_into(rest)
check_received(n, rest)
nPoints = msg_fixed.nPoints
markersLength = sizeof(buf) - sizeof(msg_fixed) - nChannels * nPoints * sizeof(c_float)
# create new type including variable fields
rda_msg_data_full_t = rda.rda_msg_data_t.full(nChannels, nPoints, markersLength)
# receive the rest
# this may be a large block, so the receive command may release
# before if received all the data. Using loop here until all
# the data is received
n = 0
while n < rda_msg_data_full_t.varLength:
msg_var = (c_char * (rda_msg_data_full_t.varLength - n)) \
.from_buffer(buf, sizeof(msg_fixed) + n)
n += s.recv_into(msg_var)
return rda_msg_data_full_t.from_buffer(buf)
[docs]def startmsg2string(msg):
'''
Converts an RDA start message (rda_msg_start_full_t) to string
Parameters
----------
msg : rda_msg_start_full_t:
RDA start message
Returns
-------
string
'''
string = '%d channels: \n' % msg.nChannels
sChannelNames = ubyte2string(msg.sChannelNames).split('\x00')
dResolutions = np.frombuffer(msg.dResolutions, dtype=np.double)
for chName, chRes in zip(sChannelNames, dResolutions):
string += chName + ': %s uV\n' % chRes
return string
[docs]def ubyte2string(array):
'''
Converts a ctypes ubyte array to string
Parameters
array : ctypes byte array
input array
Returns
-------
string
'''
return ''.join([chr(b) for b in array])
[docs]def check_received(n, msg):
'''
Checks whether the message is completely received. If OK, return
nothing, else, raises an exception
Parameters
----------
n : int
number of bytes received (returned by socket.recv_int())
msg : ctypes structure
message received
'''
if (n != sizeof(msg)):
raise Exception('Failed to receive packet, received %s bytes,' +
'should be %s bytes'
% (n, sizeof(msg)))
else: pass
[docs]def validate_rda_guid(hdr):
'''
Checks whether the signature of the message is valid, given its header
Parameters
----------
hdr : rda_msg_hdr_t
message header
Returns
-------
result : bool
verification result
'''
for b1, b2 in zip(hdr.guid, rda.RDA_GUID):
if b1 != b2: return False
return True