import sys import unittest import os import tempfile import numpy from numpy import * from numpy import rec as records from tables import * from tables.tests import common from tables.tests.common import allequal # To delete the internal attributes automagically unittest.TestCase.tearDown = common.cleanup # It is important that columns are ordered according to their names # to ease the comparison with record arrays. # Test Record class class Record(IsDescription): var0 = StringCol(itemsize=4, dflt="", shape=2) # 4-character string array var1 = StringCol(itemsize=4, dflt=["abcd","efgh"], shape=(2,2)) var1_= IntCol(dflt=((1,1),), shape=2) # integer array var2 = IntCol(dflt=((1,1),(1,1)), shape=(2,2)) # integer array var3 = Int16Col(dflt=2) # short integer var4 = FloatCol(dflt=3.1) # double (double-precision) var5 = Float32Col(dflt=4.2) # float (single-precision) var6 = UInt16Col(dflt=5) # unsigned short integer var7 = StringCol(itemsize=1, dflt="e") # 1-character String # Dictionary definition RecordDescriptionDict = { 'var0': StringCol(itemsize=4, dflt="", shape=2), # 4-character string array 'var1': StringCol(itemsize=4, dflt=["abcd","efgh"], shape=(2,2)), # 'var0': StringCol(itemsize=4, shape=2), # 4-character String # 'var1': StringCol(itemsize=4, shape=(2,2)), # 4-character String 'var1_':IntCol(shape=2), # integer array 'var2': IntCol(shape=(2,2)), # integer array 'var3': Int16Col(), # short integer 'var4': FloatCol(), # double (double-precision) 'var5': Float32Col(), # float (single-precision) 'var6': Int16Col(), # unsigned short integer 'var7': StringCol(itemsize=1), # 1-character String } # Record class with numpy dtypes (mixed shapes is checkd here) class RecordDT(IsDescription): var0 = Col.from_dtype(numpy.dtype("2S4"), dflt="") # shape in dtype var1 = Col.from_dtype(numpy.dtype(("S4", (2, 2))), dflt=["abcd","efgh"]) # shape is a mix var1_= Col.from_dtype(numpy.dtype("2i4"), dflt=((1,1),)) # shape in dtype var2 = Col.from_sctype("i4", shape=(2, 2), dflt=((1,1),(1,1))) # shape is a mix var3 = Col.from_dtype(numpy.dtype("i2"), dflt=2) var4 = Col.from_dtype(numpy.dtype("2f8"), dflt=3.1) var5 = Col.from_dtype(numpy.dtype("f4"), dflt=4.2) var6 = Col.from_dtype(numpy.dtype("()u2"), dflt=5) var7 = Col.from_dtype(numpy.dtype("1S1"), dflt="e") # no shape class BasicTestCase(common.PyTablesTestCase): #file = "test.h5" mode = "w" title = "This is the table title" expectedrows = 100 appendrows = 20 compress = 0 complib = "zlib" # Default compression library record = Record recarrayinit = 0 maxshort = 1 << 15 def setUp(self): # Create an instance of an HDF5 Table self.file = tempfile.mktemp(".h5") self.fileh = openFile(self.file, self.mode) self.rootgroup = self.fileh.root self.populateFile() self.fileh.close() def initRecArray(self): record = self.recordtemplate row = record[0] buflist = [] # Fill the recarray #for i in xrange(self.expectedrows+1): for i in xrange(self.expectedrows+1): tmplist = [] # Both forms (list or chararray) works var0 = ['%04d' % (self.expectedrows - i)] * 2 tmplist.append(var0) var1 = [['%04d' % (self.expectedrows - i)] * 2] * 2 tmplist.append(var1) var1_ = (i, 1) tmplist.append(var1_) var2 = ((i, 1), (1,1)) # *-* tmplist.append(var2) var3 = i % self.maxshort tmplist.append(var3) if isinstance(row['var4'], numpy.ndarray): tmplist.append([float(i), float(i*i)]) else: tmplist.append(float(i)) if isinstance(row['var5'], numpy.ndarray): tmplist.append(array((float(i),)*4)) else: tmplist.append(float(i)) # var6 will be like var3 but byteswaped tmplist.append(((var3>>8) & 0xff) + ((var3<<8) & 0xff00)) var7 = var1[0][0][-1] tmplist.append(var7) buflist.append(tmplist) self.record=numpy.rec.array(buflist, dtype=record.dtype, shape = self.expectedrows) return def populateFile(self): group = self.rootgroup if self.recarrayinit: # Initialize an starting buffer, if any self.initRecArray() for j in range(3): # Create a table filters = Filters(complevel = self.compress, complib = self.complib) if j < 2: byteorder = sys.byteorder else: # table2 will be byteswapped byteorder = {"little":"big","big":"little"}[sys.byteorder] table = self.fileh.createTable(group, 'table'+str(j), self.record, title = self.title, filters = filters, expectedrows = self.expectedrows, byteorder = byteorder) if not self.recarrayinit: # Get the row object associated with the new table row = table.row # Fill the table for i in xrange(self.expectedrows): row['var0'] = '%04d' % (self.expectedrows - i) row['var1'] = '%04d' % (self.expectedrows - i) row['var7'] = row['var1'][0][0][-1] row['var1_'] = (i, 1) row['var2'] = ((i, 1), (1,1)) # *-* row['var3'] = i % self.maxshort if isinstance(row['var4'], numpy.ndarray): row['var4'] = [float(i), float(i*i)] else: row['var4'] = float(i) if isinstance(row['var5'], numpy.ndarray): row['var5'] = array((float(i),)*4) else: row['var5'] = float(i) # var6 will be like var3 but byteswaped row['var6'] = ((row['var3']>>8) & 0xff) + \ ((row['var3']<<8) & 0xff00) #print("Saving -->", row) row.append() # Flush the buffer for this table table.flush() # Create a new group (descendant of group) group2 = self.fileh.createGroup(group, 'group'+str(j)) # Iterate over this new group (group2) group = group2 def tearDown(self): self.fileh.close() #del self.fileh, self.rootgroup os.remove(self.file) common.cleanup(self) #---------------------------------------- def test00_description(self): """Checking table description and descriptive fields""" self.fileh = openFile(self.file) tbl = self.fileh.getNode('/table0') desc = tbl.description if isinstance(self.record, dict): columns = self.record elif isinstance(self.record, numpy.ndarray): # This way of getting a (dictionary) description # can be used as long as the method does not alter the table. # Maybe there is a better way of doing this. columns = tbl._descrFromRA(self.record) else: # This is an ordinary description. columns = self.record.columns # Check table and description attributes at the same time. # These checks are only valid for non-nested tables. # Column names. expectedNames = ['var0', 'var1', 'var1_', 'var2', 'var3', 'var4', 'var5', 'var6', 'var7'] self.assertEqual(expectedNames, list(tbl.colnames)) self.assertEqual(expectedNames, list(desc._v_names)) # Column types. expectedTypes = [columns[colname].dtype for colname in expectedNames] self.assertEqual(expectedTypes, [tbl.coldtypes[v] for v in expectedNames]) self.assertEqual(expectedTypes, [desc._v_dtypes[v] for v in expectedNames]) # Column string types. expectedTypes = [columns[colname].type for colname in expectedNames] self.assertEqual(expectedTypes, [tbl.coltypes[v] for v in expectedNames]) self.assertEqual(expectedTypes, [desc._v_types[v] for v in expectedNames]) # Column defaults. for v in expectedNames: if common.verbose: print "dflt-->", columns[v].dflt print "coldflts-->", tbl.coldflts[v] print "desc.dflts-->", desc._v_dflts[v] assert common.areArraysEqual(tbl.coldflts[v], columns[v].dflt) assert common.areArraysEqual(desc._v_dflts[v], columns[v].dflt) # Column path names. self.assertEqual(expectedNames, list(desc._v_pathnames)) # Column objects. for colName in expectedNames: expectedCol = columns[colName] col = desc._v_colObjects[colName] self.assertEqual(expectedCol.dtype, col.dtype) self.assertEqual(expectedCol.type, col.type) def test01_readTable(self): """Checking table read and cuts""" rootgroup = self.rootgroup if common.verbose: print '\n', '-=' * 30 print "Running %s.test01_readTable..." % self.__class__.__name__ # Create an instance of an HDF5 Table self.fileh = openFile(self.file, "r") table = self.fileh.getNode("/table0") # Choose a small value for buffer size table.nrowsinbuf = 3 # Read the records and select those with "var2" file less than 20 result = [ rec['var2'][0][0] for rec in table.iterrows() if rec['var2'][0][0] < 20 ] if common.verbose: print "Table:", repr(table) print "Nrows in", table._v_pathname, ":", table.nrows print "Last record in table ==>", rec print "Total selected records in table ==> ", len(result) nrows = self.expectedrows - 1 assert (( rec['var0'][0], rec['var1'][0][0], rec['var1_'][0], rec['var2'][0][0], rec['var7'] ) == ( "0001", "0001", nrows, nrows, "1")) if isinstance(rec['var5'], numpy.ndarray): assert allequal(rec['var5'], array((nrows,)*4, float32)) else: assert rec['var5'] == float(nrows) assert len(result) == 20 def test01b_readTable(self): """Checking table read and cuts (multidimensional columns case)""" rootgroup = self.rootgroup if common.verbose: print '\n', '-=' * 30 print "Running %s.test01b_readTable..." % self.__class__.__name__ # Create an instance of an HDF5 Table self.fileh = openFile(self.file, "r") table = self.fileh.getNode("/table0") # Choose a small value for buffer size table.nrowsinbuf = 3 # Read the records and select those with "var2" file less than 20 result = [ rec['var5'] for rec in table.iterrows() if rec['var2'][0][0] < 20 ] if common.verbose: print "Nrows in", table._v_pathname, ":", table.nrows print "Last record in table ==>", rec print "Total selected records in table ==> ", len(result) nrows = table.nrows if isinstance(rec['var5'], numpy.ndarray): assert allequal(result[0], array((float(0),)*4, float32)) assert allequal(result[1], array((float(1),)*4, float32)) assert allequal(result[2], array((float(2),)*4, float32)) assert allequal(result[3], array((float(3),)*4, float32)) assert allequal(result[10], array((float(10),)*4, float32)) assert allequal(rec['var5'], array((float(nrows-1),)*4, float32)) else: assert rec['var5'] == float(nrows-1) assert len(result) == 20 # Read the records and select those with "var2" file less than 20 result = [ rec['var1'] for rec in table.iterrows() if rec['var2'][0][0] < 20 ] if rec['var1'].dtype.char == "S": a = numpy.array([['%04d' % (self.expectedrows - 0)]*2]*2) assert allequal(result[0], a) a = numpy.array([['%04d' % (self.expectedrows - 1)]*2]*2) assert allequal(result[1], a) a = numpy.array([['%04d' % (self.expectedrows - 2)]*2]*2) assert allequal(result[2], a) a = numpy.array([['%04d' % (self.expectedrows - 3)]*2]*2) assert allequal(result[3], a) a = numpy.array([['%04d' % (self.expectedrows - 10)]*2]*2) assert allequal(result[10], a) a = numpy.array([['%04d' % (1)]*2]*2) assert allequal(rec['var1'], a) else: assert rec['var1'] == "0001" assert len(result) == 20 def test02_AppendRows(self): """Checking whether appending record rows works or not""" # Now, open it, but in "append" mode self.fileh = openFile(self.file, mode = "a") self.rootgroup = self.fileh.root if common.verbose: print '\n', '-=' * 30 print "Running %s.test02_AppendRows..." % self.__class__.__name__ # Get a table table = self.fileh.getNode("/group0/table1") # Get their row object row = table.row if common.verbose: print "Nrows in old", table._v_pathname, ":", table.nrows print "Record Format ==>", table.description._v_nestedFormats print "Record Size ==>", table.rowsize # Append some rows for i in xrange(self.appendrows): row['var0'] = '%04d' % (self.appendrows - i) row['var1'] = '%04d' % (self.appendrows - i) row['var7'] = row['var1'][0][0][-1] row['var1_'] = (i, 1) row['var2'] = ((i, 1), (1,1)) # *-* row['var3'] = i % self.maxshort if isinstance(row['var4'], numpy.ndarray): row['var4'] = [float(i), float(i*i)] else: row['var4'] = float(i) if isinstance(row['var5'], numpy.ndarray): row['var5'] = array((float(i),)*4) else: row['var5'] = float(i) row.append() # Flush the buffer for this table and read it table.flush() result = [ row['var2'][0][0] for row in table.iterrows() if row['var2'][0][0] < 20 ] nrows = self.appendrows - 1 assert (( row['var0'][0], row['var1'][0][0], row['var1_'][0], row['var2'][0][0], row['var7'] ) == ( "0001", "0001", nrows, nrows, "1")) if isinstance(row['var5'], numpy.ndarray): assert allequal(row['var5'], array((float(nrows),)*4, float32)) else: assert row['var5'] == float(nrows) if self.appendrows <= 20: add = self.appendrows else: add = 20 assert len(result) == 20 + add # because we appended new rows #del table # CAVEAT: The next test only works for tables with rows < 2**15 def test03_endianess(self): """Checking if table is endianess aware""" rootgroup = self.rootgroup if common.verbose: print '\n', '-=' * 30 print "Running %s.test03_endianess..." % self.__class__.__name__ # Create an instance of an HDF5 Table self.fileh = openFile(self.file, "r") table = self.fileh.getNode("/group0/group1/table2") # Read the records and select the ones with "var3" column less than 20 result = [ rec['var2'] for rec in table.iterrows() if rec['var3'] < 20] if common.verbose: print "Nrows in", table._v_pathname, ":", table.nrows print "On-disk byteorder ==>", table.byteorder print "Last record in table ==>", rec print "Total selected records in table ==>", len(result) nrows = self.expectedrows - 1 assert (rec['var1'][0][0], rec['var3']) == ("0001", nrows) assert len(result) == 20 class BasicWriteTestCase(BasicTestCase): title = "BasicWrite" pass class DictWriteTestCase(BasicTestCase): # This checks also unidimensional arrays as columns title = "DictWrite" record = RecordDescriptionDict nrows = 21 nrowsinbuf = 3 # Choose a small value for the buffer size start = 0 stop = 10 step = 3 class DTypeWriteTestCase(BasicTestCase): title = "DTypeWriteTestCase" record=RecordDT class RecArrayOneWriteTestCase(BasicTestCase): title = "RecArrayOneWrite" record=numpy.rec.array( None, formats="(2,)a4,(2,2)a4,(2,)i4,(2,2)i4,i2,2f8,f4,i2,a1", names='var0,var1,var1_,var2,var3,var4,var5,var6,var7', shape=0) class RecArrayTwoWriteTestCase(BasicTestCase): title = "RecArrayTwoWrite" expectedrows = 100 recarrayinit = 1 recordtemplate=numpy.rec.array( None, formats="(2,)a4,(2,2)a4,(2,)i4,(2,2)i4,i2,f8,f4,i2,a1", names='var0,var1,var1_,var2,var3,var4,var5,var6,var7', shape=1) class RecArrayThreeWriteTestCase(BasicTestCase): title = "RecArrayThreeWrite" expectedrows = 100 recarrayinit = 1 recordtemplate=numpy.rec.array( None, formats="(2,)a4,(2,2)a4,(2,)i4,(2,2)i4,i2,2f8,4f4,i2,a1", names='var0,var1,var1_,var2,var3,var4,var5,var6,var7', shape=1) class CompressLZOTablesTestCase(BasicTestCase): title = "CompressLZOTables" compress = 1 complib = "lzo" class CompressBZIP2TablesTestCase(BasicTestCase): title = "CompressBZIP2Tables" compress = 1 complib = "bzip2" class CompressZLIBTablesTestCase(BasicTestCase): title = "CompressOneTables" compress = 1 complib = "zlib" class CompressTwoTablesTestCase(BasicTestCase): title = "CompressTwoTables" compress = 1 # This checks also unidimensional arrays as columns record = RecordDescriptionDict class BigTablesTestCase(BasicTestCase): title = "BigTables" # 10000 rows takes much more time than we can afford for tests # reducing to 1000 would be more than enough # F. Altet 2004-01-19 # expectedrows = 10000 # appendrows = 1000 expectedrows = 1000 appendrows = 100 class BasicRangeTestCase(unittest.TestCase): #file = "test.h5" mode = "w" title = "This is the table title" record = Record maxshort = 1 << 15 expectedrows = 100 compress = 0 # Default values nrows = 20 nrowsinbuf = 3 # Choose a small value for the buffer size start = 1 stop = nrows checkrecarray = 0 checkgetCol = 0 def setUp(self): # Create an instance of an HDF5 Table self.file = tempfile.mktemp(".h5") self.fileh = openFile(self.file, self.mode) self.rootgroup = self.fileh.root self.populateFile() self.fileh.close() def populateFile(self): group = self.rootgroup for j in range(3): # Create a table table = self.fileh.createTable(group, 'table'+str(j), self.record, title = self.title, filters = Filters(self.compress), expectedrows = self.expectedrows) # Get the row object associated with the new table row = table.row # Fill the table for i in xrange(self.expectedrows): row['var1'] = '%04d' % (self.expectedrows - i) row['var7'] = row['var1'][0][0][-1] row['var2'] = i row['var3'] = i % self.maxshort if isinstance(row['var4'], numpy.ndarray): row['var4'] = [float(i), float(i*i)] else: row['var4'] = float(i) if isinstance(row['var5'], numpy.ndarray): row['var5'] = array((float(i),)*4) else: row['var5'] = float(i) # var6 will be like var3 but byteswaped row['var6'] = ((row['var3'] >> 8) & 0xff) + \ ((row['var3'] << 8) & 0xff00) row.append() # Flush the buffer for this table table.flush() # Create a new group (descendant of group) group2 = self.fileh.createGroup(group, 'group'+str(j)) # Iterate over this new group (group2) group = group2 def tearDown(self): if self.fileh.isopen: self.fileh.close() #del self.fileh, self.rootgroup os.remove(self.file) common.cleanup(self) #---------------------------------------- def check_range(self): rootgroup = self.rootgroup # Create an instance of an HDF5 Table self.fileh = openFile(self.file, "r") table = self.fileh.getNode("/table0") table.nrowsinbuf = self.nrowsinbuf r = slice(self.start, self.stop, self.step) resrange = r.indices(table.nrows) reslength = len(range(*resrange)) if self.checkrecarray: recarray = table.read(self.start, self.stop, self.step) result = [] for nrec in range(len(recarray)): if recarray['var2'][nrec][0][0] < self.nrows: result.append(recarray['var2'][nrec][0][0]) elif self.checkgetCol: column = table.read(self.start, self.stop, self.step, 'var2') result = [] for nrec in range(len(column)): if column[nrec][0][0] < self.nrows: #*-* result.append(column[nrec][0][0]) #*-* else: result = [ rec['var2'][0][0] for rec in table.iterrows(self.start, self.stop, self.step) if rec['var2'][0][0] < self.nrows ] if self.start < 0: startr = self.expectedrows + self.start else: startr = self.start if self.stop == None: stopr = startr + 1 elif self.stop < 0: stopr = self.expectedrows + self.stop else: stopr = self.stop if self.nrows < stopr: stopr = self.nrows if common.verbose: print "Nrows in", table._v_pathname, ":", table.nrows if reslength: if self.checkrecarray: print "Last record *read* in recarray ==>", recarray[-1] elif self.checkgetCol: print "Last value *read* in getCol ==>", column[-1] else: print "Last record *read* in table range ==>", rec print "Total number of selected records ==>", len(result) print "Selected records:\n", result print "Selected records should look like:\n", \ range(startr, stopr, self.step) print "start, stop, step ==>", startr, stopr, self.step assert result == range(startr, stopr, self.step) if startr < stopr and not (self.checkrecarray or self.checkgetCol): if self.nrows < self.expectedrows: assert rec['var2'][0][0] == \ range(self.start, self.stop, self.step)[-1] else: assert rec['var2'][0][0] == range(startr, stopr, self.step)[-1] # Close the file self.fileh.close() def test01_range(self): """Checking ranges in table iterators (case1)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test01_range..." % self.__class__.__name__ # Case where step < nrowsinbuf < 2*step self.nrows = 21 self.nrowsinbuf = 3 self.start = 0 self.stop = self.expectedrows self.step = 2 self.check_range() def test02_range(self): """Checking ranges in table iterators (case2)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test02_range..." % self.__class__.__name__ # Case where step < nrowsinbuf < 10*step self.nrows = 21 self.nrowsinbuf = 31 self.start = 11 self.stop = self.expectedrows self.step = 3 self.check_range() def test03_range(self): """Checking ranges in table iterators (case3)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test03_range..." % self.__class__.__name__ # Case where step < nrowsinbuf < 1.1*step self.nrows = self.expectedrows self.nrowsinbuf = 11 # Choose a small value for the buffer size self.start = 0 self.stop = self.expectedrows self.step = 10 self.check_range() def test04_range(self): """Checking ranges in table iterators (case4)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test04_range..." % self.__class__.__name__ # Case where step == nrowsinbuf self.nrows = self.expectedrows self.nrowsinbuf = 11 # Choose a small value for the buffer size self.start = 1 self.stop = self.expectedrows self.step = 11 self.check_range() def test05_range(self): """Checking ranges in table iterators (case5)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test05_range..." % self.__class__.__name__ # Case where step > 1.1*nrowsinbuf self.nrows = 21 self.nrowsinbuf = 10 # Choose a small value for the buffer size self.start = 1 self.stop = self.expectedrows self.step = 11 self.check_range() def test06_range(self): """Checking ranges in table iterators (case6)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test06_range..." % self.__class__.__name__ # Case where step > 3*nrowsinbuf self.nrows = 3 self.nrowsinbuf = 3 # Choose a small value for the buffer size self.start = 2 self.stop = self.expectedrows self.step = 10 self.check_range() def test07_range(self): """Checking ranges in table iterators (case7)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test07_range..." % self.__class__.__name__ # Case where start == stop self.nrows = 2 self.nrowsinbuf = 3 # Choose a small value for the buffer size self.start = self.nrows self.stop = self.nrows self.step = 10 self.check_range() def test08_range(self): """Checking ranges in table iterators (case8)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test08_range..." % self.__class__.__name__ # Case where start > stop self.nrows = 2 self.nrowsinbuf = 3 # Choose a small value for the buffer size self.start = self.nrows + 1 self.stop = self.nrows self.step = 1 self.check_range() def test09_range(self): """Checking ranges in table iterators (case9)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test09_range..." % self.__class__.__name__ # Case where stop = None self.nrows = 100 self.nrowsinbuf = 3 # Choose a small value for the buffer size self.start = 1 self.stop = None self.step = 1 self.check_range() def test10_range(self): """Checking ranges in table iterators (case10)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test10_range..." % self.__class__.__name__ # Case where start < 0 and stop = 0 self.nrows = self.expectedrows self.nrowsinbuf = 5 # Choose a small value for the buffer size self.start = -6 self.startr = self.expectedrows + self.start self.stop = 0 self.stopr = self.expectedrows + self.stop self.step = 2 self.check_range() def test11_range(self): """Checking ranges in table iterators (case11)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test11_range..." % self.__class__.__name__ # Case where start < 0 and stop < 0 self.nrows = self.expectedrows self.nrowsinbuf = 5 # Choose a small value for the buffer size self.start = -6 self.startr = self.expectedrows + self.start self.stop = -2 self.stopr = self.expectedrows + self.stop self.step = 1 self.check_range() def test12_range(self): """Checking ranges in table iterators (case12)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test12_range..." % self.__class__.__name__ # Case where start < 0 and stop < 0 and start > stop self.nrows = self.expectedrows self.nrowsinbuf = 5 # Choose a small value for the buffer size self.start = -1 self.startr = self.expectedrows + self.start self.stop = -2 self.stopr = self.expectedrows + self.stop self.step = 1 self.check_range() def test13_range(self): """Checking ranges in table iterators (case13)""" if common.verbose: print '\n', '-=' * 30 print "Running %s.test13_range..." % self.__class__.__name__ # Case where step < 0 self.step = -11 try: self.check_range() except ValueError: if common.verbose: (type, value, traceback) = sys.exc_info() print "\nGreat!, the next ValueError was catched!" print value self.fileh.close() else: print rec self.fail("expected a ValueError") # Case where step == 0 self.step = 0 try: self.check_range() except ValueError: if common.verbose: (type, value, traceback) = sys.exc_info() print "\nGreat!, the next ValueError was catched!" print value self.fileh.close() else: print rec self.fail("expected a ValueError") class IterRangeTestCase(BasicRangeTestCase): pass class RecArrayRangeTestCase(BasicRangeTestCase): checkrecarray = 1 class getColRangeTestCase(BasicRangeTestCase): checkgetCol = 1 def test01_nonexistentField(self): """Checking non-existing Field in getCol method """ if common.verbose: print '\n', '-=' * 30 print "Running %s.test01_nonexistentField..." % self.__class__.__name__ # Create an instance of an HDF5 Table self.fileh = openFile(self.file, "r") self.root = self.fileh.root table = self.fileh.getNode("/table0") try: column = table.read(field='non-existent-column') except KeyError: if common.verbose: (type, value, traceback) = sys.exc_info() print "\nGreat!, the next KeyError was catched!" print value pass else: print rec self.fail("expected a KeyError") class RecArrayIO(unittest.TestCase): def test00(self): "Checking saving a normal recarray" file = tempfile.mktemp(".h5") fileh = openFile(file, "w") # Create a recarray intlist1 = [[456,23]*3]*2 intlist2 = array([[2,2]*3]*2, dtype=int) arrlist1 = [['dbe']*2]*3 arrlist2 = [['de']*2]*3 floatlist1 = [[1.2,2.3]*3]*4 floatlist2 = array([[4.5,2.4]*3]*4) b = [[intlist1, arrlist1, floatlist1],[intlist2, arrlist2, floatlist2]] r=numpy.rec.array(b, formats='(2,6)i4,(3,2)a3,(4,6)f8', names='col1,col2,col3') # Save it in a table: fileh.createTable(fileh.root, 'recarray', r) # Read it again r2 = fileh.root.recarray.read() assert r.tostring() == r2.tostring() fileh.close() os.remove(file) def test01(self): "Checking saving a recarray with an offset in its buffer" file = tempfile.mktemp(".h5") fileh = openFile(file, "w") # Create a recarray intlist1 = [[456,23]*3]*2 intlist2 = array([[2,2]*3]*2, dtype=int) arrlist1 = [['dbe']*2]*3 arrlist2 = [['de']*2]*3 floatlist1 = [[1.2,2.3]*3]*4 floatlist2 = array([[4.5,2.4]*3]*4) b = [[intlist1, arrlist1, floatlist1],[intlist2, arrlist2, floatlist2]] r=numpy.rec.array(b, formats='(2,6)i4,(3,2)a3,(4,6)f8', names='col1,col2,col3') # Get a view of the recarray r1 = r[1:] # Save it in a table: fileh.createTable(fileh.root, 'recarray', r1) # Read it again r2 = fileh.root.recarray.read() assert r1.tostring() == r2.tostring() fileh.close() os.remove(file) def test02(self): "Checking saving a slice of a large recarray" file = tempfile.mktemp(".h5") fileh = openFile(file, "w") # Create a recarray intlist1 = [[[23,24,35]*6]*6] intlist2 = array([[[2,3,4]*6]*6], dtype=int) arrlist1 = [['dbe']*2]*3 arrlist2 = [['de']*2]*3 floatlist1 = [[1.2,2.3]*3]*4 floatlist2 = array([[4.5,2.4]*3]*4) b=[[intlist1, arrlist1, floatlist1],[intlist2, arrlist2, floatlist2]] r=numpy.rec.array(b*300, formats='(1,6,18)i4,(3,2)a3,(4,6)f8', names='col1,col2,col3') # Get an slice of recarray r1 = r[290:292] # Save it in a table: fileh.createTable(fileh.root, 'recarray', r1) # Read it again r2 = fileh.root.recarray.read() assert r1.tostring() == r2.tostring() fileh.close() os.remove(file) def test03(self): "Checking saving a slice of an strided recarray" file = tempfile.mktemp(".h5") fileh = openFile(file, "w") # Create a recarray intlist1 = [[[23,24,35]*6]*6] intlist2 = array([[[2,3,4]*6]*6], dtype=int) arrlist1 = [['dbe']*2]*3 arrlist2 = [['de']*2]*3 floatlist1 = [[1.2,2.3]*3]*4 floatlist2 = array([[4.5,2.4]*3]*4) b = [[intlist1, arrlist1, floatlist1],[intlist2, arrlist2, floatlist2]] r=numpy.rec.array(b*300, formats='(1,6,18)i4,(3,2)a3,(4,6)f8', names='col1,col2,col3', shape=300) # Get an strided recarray r2 = r[::2] # Get a slice r1 = r2[148:] # Save it in a table: fileh.createTable(fileh.root, 'recarray', r1) # Read it again r2 = fileh.root.recarray.read() assert r1.tostring() == r2.tostring() fileh.close() os.remove(file) class DefaultValues(unittest.TestCase): def test00(self): "Checking saving a Table MD with default values" file = tempfile.mktemp(".h5") #file = "/tmp/test.h5" fileh = openFile(file, "w") # Create a table table = fileh.createTable(fileh.root, 'table', Record) # Take a number of records a bit large #nrows = int(table.nrowsinbuf * 1.1) nrows = 5 # for test # Fill the table with nrows records for i in xrange(nrows): if i == 3 or i == 4: table.row['var2'] = ((2,2),(2,2)) #*-* # This injects the row values. table.row.append() # We need to flush the buffers in table in order to get an # accurate number of records on it. table.flush() # Create a recarray with the same default values buffer = [[ ["\x00"]*2, # just "" does not initialize the buffer properly [["abcd","efgh"]]*2, (1,1), ((1,1),(1,1)), 2, 3.1, 4.2, 5, "e"]] r = numpy.rec.array( buffer*nrows, formats='(2,)a4,(2,2)a4,(2,)i4,(2,2)i4,i2,f8,f4,u2,a1', names = ['var0', 'var1', 'var1_', 'var2', 'var3', 'var4', 'var5', 'var6', 'var7']) #*-* # Assign the value exceptions r["var2"][3] = ((2,2), (2,2)) #*-* r["var2"][4] = ((2,2), (2,2)) #*-* # Read the table in another recarray r2 = table.read() # This generates too much output. Activate only when # self.nrowsinbuf is very small (<10) if common.verbose and 1: print "Table values:" print r2 print "Record values:" print r # Both checks do work, however, tostring() seems more stringent. assert r.tostring() == r2.tostring() #assert common.areArraysEqual(r,r2) fileh.close() os.remove(file) class RecordT(IsDescription): var0 = IntCol(dflt=1, shape=()) # native int var1 = IntCol(dflt=[1], shape=(1,)) # 1-D int (one element) var2_s = IntCol(dflt=[1,1], shape=2) # 1-D int (two elements) var2 = IntCol(dflt=[1,1], shape=(2,)) # 1-D int (two elements) var3 = IntCol(dflt=[[0,0],[1,1]], shape=(2,2)) # 2-D int class ShapeTestCase(unittest.TestCase): def setUp(self): # Create an instance of an HDF5 Table self.file = tempfile.mktemp(".h5") self.fileh = openFile(self.file, "w") self.populateFile() def populateFile(self): table = self.fileh.createTable(self.fileh.root, 'table', RecordT) row = table.row # Fill the table with some rows with default values for i in xrange(1): row.append() # Flush the buffer for this table table.flush() def tearDown(self): self.fileh.close() os.remove(self.file) common.cleanup(self) #---------------------------------------- def test00(self): "Checking scalar shapes" if self.reopen: self.fileh.close() self.fileh = openFile(self.file) table = self.fileh.root.table if common.verbose: print "The values look like:", table.cols.var0[:] print "They should look like:", [1] # The real check assert table.cols.var0[:].tolist() == [1] def test01(self): "Checking undimensional (one element) shapes" if self.reopen: self.fileh.close() self.fileh = openFile(self.file) table = self.fileh.root.table if common.verbose: print "The values look like:", table.cols.var1[:] print "They should look like:", [[1]] # The real check assert table.cols.var1[:].tolist() == [[1]] def test02(self): "Checking undimensional (two elements) shapes" if self.reopen: self.fileh.close() self.fileh = openFile(self.file) table = self.fileh.root.table if common.verbose: print "The values look like:", table.cols.var2[:] print "They should look like:", [[1,1]] # The real check assert table.cols.var2[:].tolist() == [[1,1]] assert table.cols.var2_s[:].tolist() == [[1,1]] def test03(self): "Checking bidimensional shapes" if self.reopen: self.fileh.close() self.fileh = openFile(self.file) table = self.fileh.root.table if common.verbose: print "The values look like:", table.cols.var3[:] print "They should look like:", [[[0,0],[1,1]]] # The real check assert table.cols.var3[:].tolist() == [[[0,0],[1,1]]] class ShapeTestCase1(ShapeTestCase): reopen = 0 class ShapeTestCase2(ShapeTestCase): reopen = 1 class Rec(IsDescription): col1 = IntCol(pos=1, shape=(2,)) col2 = StringCol(itemsize=3, pos=2, shape=(3,)) col3 = FloatCol(pos=3, shape=(3,2)) class setItem(common.PyTablesTestCase): def setUp(self): self.file = tempfile.mktemp(".h5") self.fileh = openFile(self.file, "w") # Create a new table: self.table = self.fileh.createTable(self.fileh.root, 'recarray', Rec) self.table.nrowsinbuf = self.buffersize # set buffer value def tearDown(self): self.fileh.close() #del self.fileh, self.rootgroup os.remove(self.file) common.cleanup(self) def test01(self): "Checking modifying one table row with __setitem__" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing row table[2] = (456,'db2',1.2) # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [456,'db2',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test01b(self): "Checking modifying one table row with __setitem__ (long index)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing row table[2] = (456,'db2',1.2) # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [456,'db2',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test02(self): "Modifying one row, with a step (__setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows rows = records.array([[457,'db1',1.2],[6,'de2',1.3]], formats=formats) table[1:3:2] = rows # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [457,'db1',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test03(self): "Checking modifying several rows at once (__setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows rows = records.array([[457,'db1',1.2],[5,'de1',1.3]], formats=formats) #table.modifyRows(start=1, rows=rows) table[1:3] = rows # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [5,'de1',1.3],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test04(self): "Modifying several rows at once, with a step (__setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows rows = records.array([[457,'db1',1.2],[6,'de2',1.3]], formats=formats) #table[1:4:2] = rows table[1::2] = rows # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [457,'db1',1.2],[6,'de2',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test05(self): "Checking modifying one column (single element, __setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column table.cols.col1[1] = -1 # Create the modified recarray r1=records.array([[456,'dbe',1.2],[-1,'ded',1.3], [457,'db1',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test06a(self): "Checking modifying one column (several elements, __setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column table.cols.col1[1:4] = [(2,2),(3,3),(4,4)] # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [3,'db1',1.2],[4,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test06b(self): "Checking modifying one column (iterator, __setitem__)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column try: for row in table.iterrows(): row['col1'] = row.nrow+1 row.append() table.flush() except NotImplementedError: if common.verbose: (type, value, traceback) = sys.exc_info() print "\nGreat!, the next NotImplementedError was catched!" print value else: self.fail("expected a NotImplementedError") def test07(self): "Modifying one column (several elements, __setitem__, step)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[1,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column table.cols.col1[1:4:2] = [(2,2),(3,3)] # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [457,'db1',1.2],[3,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test08(self): "Modifying one column (one element, __setitem__, step)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column table.cols.col1[1:4:3] = [(2,2)] # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [457,'db1',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test09(self): "Modifying beyond the table extend (__setitem__, step)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Try to modify beyond the extend # This will silently exclude the non-fitting rows rows = records.array([[457,'db1',1.2],[6,'de2',1.3],[457,'db1',1.2]], formats=formats) table[1:6:2] = rows # How it should look like r1 = records.array([[456,'dbe',1.2],[457,'db1',1.2], [457,'db1',1.2],[6,'de2',1.3]], formats=formats) # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 class setItem1(setItem): reopen=0 buffersize = 1 class setItem2(setItem): reopen=1 buffersize = 2 class setItem3(setItem): reopen=0 buffersize = 1000 class setItem4(setItem): reopen=1 buffersize = 1000 class updateRow(common.PyTablesTestCase): def setUp(self): self.file = tempfile.mktemp(".h5") self.fileh = openFile(self.file, "w") # Create a new table: self.table = self.fileh.createTable(self.fileh.root, 'recarray', Rec) self.table.nrowsinbuf = self.buffersize # set buffer value def tearDown(self): self.fileh.close() os.remove(self.file) common.cleanup(self) def test01(self): "Checking modifying one table row with Row.update" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing row for row in table.iterrows(2): (row['col1'], row['col2'], row['col3']) = [456,'db2',1.2] row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [456,'db2',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test02(self): "Modifying one row, with a step (Row.update)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows for row in table.iterrows(1, 3, 2): if row.nrow == 1: (row['col1'], row['col2'], row['col3']) = [457,'db1',1.2] elif row.nrow == 3: (row['col1'], row['col2'], row['col3']) = [6,'de2',1.3] row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [457,'db1',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test03(self): "Checking modifying several rows at once (Row.update)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows for row in table.iterrows(1, 3): if row.nrow == 1: (row['col1'], row['col2'], row['col3']) = [457,'db1',1.2] elif row.nrow == 2: (row['col1'], row['col2'], row['col3']) = [5,'de1',1.3] row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [5,'de1',1.3],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test04(self): "Modifying several rows at once, with a step (Row.update)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify two existing rows for row in table.iterrows(1, stop=4, step=2): if row.nrow == 1: (row['col1'], row['col2'], row['col3']) = [457,'db1',1.2] elif row.nrow == 3: (row['col1'], row['col2'], row['col3']) = [6,'de2',1.3] row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[457,'db1',1.2], [457,'db1',1.2],[6,'de2',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test05(self): "Checking modifying one column (single element, Row.update)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column for row in table.iterrows(1): row['col1'] = -1 row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[-1,'ded',1.3], [457,'db1',1.2],[5,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test06(self): "Checking modifying one column (several elements, Row.update)" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[2,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just one existing column for row in table.iterrows(1,4): row['col1'] = row.nrow+1 row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ded',1.3], [3,'db1',1.2],[4,'de1',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test07(self): "Modifying values from a selection" table = self.table formats = table.description._v_nestedFormats # append new rows r=records.array([[456,'dbe',1.2],[1,'ded',1.3]], formats=formats) table.append(r) table.append([[457,'db1',1.2],[5,'de1',1.3]]) # Modify just rows with col1 < 456 for row in table.iterrows(): if row['col1'][0] < 456: row['col1'] = 2 row['col2'] = 'ada' row.update() # Create the modified recarray r1=records.array([[456,'dbe',1.2],[2,'ada',1.3], [457,'db1',1.2],[2,'ada',1.3]], formats=formats, names = "col1,col2,col3") # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == 4 def test08(self): "Modifying a large table (Row.update)" table = self.table formats = table.description._v_nestedFormats nrows = 100 # append new rows row = table.row for i in xrange(nrows): row['col1'] = i-1 row['col2'] = 'a'+str(i-1) row['col3'] = -1.0 row.append() table.flush() # Modify all the rows for row in table.iterrows(): row['col1'] = row.nrow row['col2'] = 'b'+str(row.nrow) row['col3'] = 0.0 row.update() # Create the modified recarray r1=records.array(None, shape=nrows, formats=formats, names = "col1,col2,col3") for i in xrange(nrows): r1['col1'][i] = i r1['col2'][i] = 'b'+str(i) r1['col3'][i] = 0.0 # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == nrows def test08b(self): "Setting values on a large table without calling Row.update" table = self.table formats = table.description._v_nestedFormats nrows = 100 # append new rows row = table.row for i in xrange(nrows): row['col1'] = i-1 row['col2'] = 'a'+str(i-1) row['col3'] = -1.0 row.append() table.flush() # Modify all the rows (actually don't) for row in table.iterrows(): row['col1'] = row.nrow row['col2'] = 'b'+str(row.nrow) row['col3'] = 0.0 #row.update() # Create the modified recarray r1=records.array(None, shape=nrows, formats=formats, names = "col1,col2,col3") for i in xrange(nrows): r1['col1'][i] = i-1 r1['col2'][i] = 'a'+str(i-1) r1['col3'][i] = -1.0 # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == nrows def test09(self): "Modifying selected values on a large table" table = self.table formats = table.description._v_nestedFormats nrows = 100 # append new rows row = table.row for i in xrange(nrows): row['col1'] = i-1 row['col2'] = 'a'+str(i-1) row['col3'] = -1.0 row.append() table.flush() # Modify selected rows for row in table.iterrows(): if row['col1'][0] > nrows-3: row['col1'] = row.nrow row['col2'] = 'b'+str(row.nrow) row['col3'] = 0.0 row.update() # Create the modified recarray r1=records.array(None, shape=nrows, formats=formats, names = "col1,col2,col3") for i in xrange(nrows): r1['col1'][i] = i-1 r1['col2'][i] = 'a'+str(i-1) r1['col3'][i] = -1.0 # modify just the last line r1['col1'][i] = i r1['col2'][i] = 'b'+str(i) r1['col3'][i] = 0.0 # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == nrows def test09b(self): "Modifying selected values on a large table (alternate values)" table = self.table formats = table.description._v_nestedFormats nrows = 100 # append new rows row = table.row for i in xrange(nrows): row['col1'] = i-1 row['col2'] = 'a'+str(i-1) row['col3'] = -1.0 row.append() table.flush() # Modify selected rows for row in table.iterrows(step=10): row['col1'] = row.nrow row['col2'] = 'b'+str(row.nrow) row['col3'] = 0.0 row.update() # Create the modified recarray r1=records.array(None, shape=nrows, formats=formats, names = "col1,col2,col3") for i in xrange(nrows): if i % 10 > 0: r1['col1'][i] = i-1 r1['col2'][i] = 'a'+str(i-1) r1['col3'][i] = -1.0 else: r1['col1'][i] = i r1['col2'][i] = 'b'+str(i) r1['col3'][i] = 0.0 # Read the modified table if self.reopen: self.fileh.close() self.fileh = openFile(self.file, "r") table = self.fileh.root.recarray table.nrowsinbuf = self.buffersize # set buffer value r2 = table.read() if common.verbose: print "Original table-->", repr(r2) print "Should look like-->", repr(r1) assert r1.tostring() == r2.tostring() assert table.nrows == nrows class updateRow1(updateRow): reopen=0 buffersize = 1 class updateRow2(updateRow): reopen=1 buffersize = 2 class updateRow3(updateRow): reopen=0 buffersize = 1000 class updateRow4(updateRow): reopen=1 buffersize = 1000 #---------------------------------------------------------------------- def suite(): theSuite = unittest.TestSuite() niter = 1 #common.heavy = 1 # Uncomment this only for testing purposes for n in range(niter): theSuite.addTest(unittest.makeSuite(BasicWriteTestCase)) theSuite.addTest(unittest.makeSuite(DictWriteTestCase)) theSuite.addTest(unittest.makeSuite(DTypeWriteTestCase)) theSuite.addTest(unittest.makeSuite(RecArrayOneWriteTestCase)) theSuite.addTest(unittest.makeSuite(RecArrayTwoWriteTestCase)) theSuite.addTest(unittest.makeSuite(RecArrayThreeWriteTestCase)) theSuite.addTest(unittest.makeSuite(CompressZLIBTablesTestCase)) theSuite.addTest(unittest.makeSuite(CompressTwoTablesTestCase)) theSuite.addTest(unittest.makeSuite(IterRangeTestCase)) theSuite.addTest(unittest.makeSuite(RecArrayRangeTestCase)) theSuite.addTest(unittest.makeSuite(getColRangeTestCase)) theSuite.addTest(unittest.makeSuite(DefaultValues)) theSuite.addTest(unittest.makeSuite(RecArrayIO)) theSuite.addTest(unittest.makeSuite(ShapeTestCase1)) theSuite.addTest(unittest.makeSuite(ShapeTestCase2)) theSuite.addTest(unittest.makeSuite(setItem1)) theSuite.addTest(unittest.makeSuite(setItem2)) theSuite.addTest(unittest.makeSuite(setItem3)) theSuite.addTest(unittest.makeSuite(setItem4)) theSuite.addTest(unittest.makeSuite(updateRow1)) theSuite.addTest(unittest.makeSuite(updateRow2)) theSuite.addTest(unittest.makeSuite(updateRow3)) theSuite.addTest(unittest.makeSuite(updateRow4)) if common.heavy: theSuite.addTest(unittest.makeSuite(CompressLZOTablesTestCase)) theSuite.addTest(unittest.makeSuite(CompressBZIP2TablesTestCase)) theSuite.addTest(unittest.makeSuite(BigTablesTestCase)) return theSuite if __name__ == '__main__': unittest.main( defaultTest='suite' )