Package linda :: Module unpack
[hide private]

Source Code for Module linda.unpack

  1  import os, sys, re, shutil, random 
  2  from linda import clparser 
  3  from linda.debug import dprint, vprint 
  4  from linda.collector import Collector 
  5  from linda.funcs import run_external_cmd, iterate_dir, ExtCmdException 
  6  from linda.parser.control import DebianControlParser, DCPException 
  7  from linda.parser.dsc import DSCParser, DSCParserException, \ 
  8      filter_for_src_dir, file_or_sym 
  9  from linda.parser.unixperm import UnixPermParser 
 10   
11 -class Unpacker:
12 - def __init__(self):
13 self.lab = self.set_up_lab() 14 self.information = {'control': {'self': {}, 'info': {}}, 'dsc': [], \ 15 'dir': '', 'collector': None} 16 self.files = {'tarball': '', 'patch': ''}
17 18
19 - def unpack(self, file, level=1):
20 if hasattr(self, 'unpack_%s_%d' % (file[-3:], level)): 21 getattr(self, 'unpack_%s_%d' % (file[-3:], level))(file) 22 else: 23 raise UnpackException("No idea how to unpack %s at level %d" % \ 24 (file, level))
25
26 - def unpack_deb_1(self, file):
27 try: 28 dprint(_("Unpacking binary, level 1")) 29 cur_dir = os.getcwd() 30 os.chdir(self.lab) 31 run_external_cmd('ar x %s' % file) 32 os.chdir(cur_dir) 33 os.unlink(os.path.join(self.lab, 'debian-binary')) # For now 34 output = run_external_cmd('tar zxvvCf %s %s' % \ 35 (os.path.join(self.lab, 'control'), os.path.join(self.lab, \ 36 'control.tar.gz'))) 37 os.unlink(os.path.join(self.lab, 'control.tar.gz')) 38 try: 39 self.information['control']['self'] = \ 40 DebianControlParser('%s/control/control' % self.lab) 41 except DCPException, e: 42 raise UnpackException(e) 43 for x in output.split('\n'): 44 tmp_array = x.split(' ') 45 cur_file = tmp_array[-1] 46 if cur_file == './': continue 47 self.information['control']['info'][cur_file[2:]] = \ 48 [tmp_array[1], UnixPermParser(tmp_array[0])] 49 dprint(_("Control info: %s.") % \ 50 self.information['control']['info'], 2) 51 except EnvironmentError: 52 raise UnpackException("Level 1 binary unpacking failed!")
53
54 - def unpack_deb_2(self, file):
55 try: 56 dprint(_("Jumping to level 2 (binary)")) 57 try: 58 output = run_external_cmd('LANG=C tar zvvxCf %s %s' % \ 59 (os.path.join(self.lab, 'unpacked'), \ 60 os.path.join(self.lab, 'data.tar.gz'))) 61 except ExtCmdException: 62 raise UnpackException("Could not unpack data tarball") 63 os.unlink(os.path.join(self.lab, 'data.tar.gz')) 64 self.information['dir'] = self.lab 65 if not clparser['unpack']: 66 self.information['collector'] = Collector('bin', \ 67 '%s/unpacked' % self.lab, output) 68 for x in ('files', 'dirs', 'elf'): 69 dprint(_("Files: %s: %s") % (x, \ 70 self.information['collector']('files', x)), 4) 71 for x in ('file', 'ldd', 'objdump'): 72 dprint(_("Output: %s: %s") % (x, \ 73 self.information['collector']('output', x)), 4) 74 except EnvironmentError: 75 raise UnpackException("Level 2 binary unpacking failed!")
76
77 - def unpack_dsc_1(self, file):
78 try: 79 self.information['dsc'] = DSCParser(file) 80 except DSCParserException: 81 raise UnpackException("Can not parse .dsc") 82 dprint(_("Parsed .dsc: %s.") % self.information['dsc'])
83
84 - def unpack_dsc_2(self, file):
85 dprint(_("Jumping to level 2 (source)")) 86 cur_dir = os.getcwd() 87 os.chdir(self.lab) 88 unparsed_ver = self.information['dsc']['version'] 89 if unparsed_ver.find('-') != -1: 90 version = unparsed_ver[:unparsed_ver.rfind('-')] 91 else: 92 version = unparsed_ver 93 if version.find(':') != -1: 94 version = version.split(':')[1] 95 new_directory = '%s-%s' % (self.information['dsc']['source'], version) 96 self.information['dir'] = os.path.abspath(new_directory) 97 keeper, throw = os.path.split(file) 98 for i in self.information['dsc']['files']: 99 tmp = i.split(' ') 100 if tmp[3].endswith('tar.gz'): 101 self.files['tarball'] = '%s/%s' % (keeper, tmp[3]) 102 elif tmp[3].endswith('diff.gz'): 103 self.files['patch'] = '%s/%s' % (keeper, tmp[3]) 104 filename = {'tar': os.path.split(self.files['tarball'])[1], 'patch': \ 105 os.path.split(self.files['patch'])[1]} 106 dprint(_("Untaring source: %s") % filename['tar']) 107 if os.access(self.files['tarball'], os.R_OK): 108 run_external_cmd('tar zxf %s' % self.files['tarball']) 109 else: 110 raise UnpackException, "%s doesn't exist or isn't readable" % \ 111 self.files['tarball'] 112 snapshot_lab = filter(filter_for_src_dir, os.listdir(self.lab)) 113 if len(snapshot_lab) == 1: 114 if snapshot_lab[0] != new_directory: 115 if os.path.isdir(os.path.join(self.lab, snapshot_lab[0])): 116 dprint(_("Renaming source directory: %s -> %s") % \ 117 (snapshot_lab[0], new_directory)) 118 os.rename(snapshot_lab[0], new_directory) 119 else: 120 dprint(_("Creating new directory: %s") % new_directory) 121 os.mkdir(os.path.join(self.lab, new_directory)) 122 dprint(_("Moving %s into %s.") % (snapshot_lab[0], \ 123 new_directory)) 124 os.rename(os.path.join(self.lab, snapshot_lab[0]), \ 125 os.path.join(self.lab, new_directory, snapshot_lab[0])) 126 elif len(snapshot_lab) == 0: 127 raise UnpackException("no source files found.") 128 else: 129 if not os.path.exists(new_directory): 130 os.mkdir(new_directory) 131 for file in snapshot_lab: 132 if file == new_directory: 133 continue 134 dprint(_("Moving %s into %s") % (file, new_directory)) 135 os.rename(file, os.path.join(new_directory, file)) 136 if filename['patch']: 137 dprint(_("Applying patch: %s.") % filename['patch']) 138 try: 139 run_external_cmd('zcat %s | patch -p0 -g 0 -t -s' % \ 140 self.files['patch']) 141 except ExtCmdException: 142 dprint(out) 143 raise UnpackException('patch failed to run!') 144 try: 145 dprint(_("Parsing control file.")) 146 self.information['control']['self'] = \ 147 DebianControlParser(os.path.join(self.information['dir'], \ 148 'debian', 'control')) 149 except DCPException, e: 150 raise UnpackException(e) 151 self.information['collector'] = Collector('src', self.lab, \ 152 filter(file_or_sym, iterate_dir(new_directory))) 153 os.chdir(cur_dir) 154 dprint(_("Files: %s") % \ 155 self.information['collector']('files', 'files'), 4) 156 dprint(_("Output: %s") % \ 157 self.information['collector']('output', 'file'), 4)
158
159 - def set_up_lab(self):
160 for dir in (clparser['lab-root'], os.environ.get('TMPDIR'), '/tmp'): 161 if dir: 162 lab_root = dir 163 break 164 lab_directory = "%s/linda-lab-%05d" % (lab_root, \ 165 random.choice(range(100000))) 166 if os.path.exists(lab_directory): 167 dprint(_("Lab %s already exists; trying again.") % lab_directory) 168 lab_directory = "%s/linda-lab-%05d" % (lab_root, \ 169 random.choice(range(100000))) 170 try: 171 os.makedirs(lab_directory + '/control') 172 os.mkdir(lab_directory + '/unpacked') 173 dprint(_("Creating lab directory: %s.") % lab_directory) 174 vprint(_("Creating lab directory: %s.") % lab_directory, 2) 175 return lab_directory 176 except OSError, e: 177 raise UnpackException('Failed to create lab directory: %s.' % e)
178
179 - def cull_lab(self):
180 real_cull(self.lab)
181
182 -class UnpackException(Exception):
183 pass
184
185 -def real_cull(lab):
186 # So I don't have to duplicate lab cleaning code. 187 if not clparser['no-cull']: 188 try: 189 shutil.rmtree(lab) 190 except OSError: 191 print _("Failed to remove lab directory: %s") % lab 192 dprint(_("Removing lab directory: %s.") % lab) 193 vprint(_("Removing lab directory: %s.") % lab, 2)
194