|
1 """Macintosh binhex compression/decompression. |
|
2 |
|
3 easy interface: |
|
4 binhex(inputfilename, outputfilename) |
|
5 hexbin(inputfilename, outputfilename) |
|
6 """ |
|
7 |
|
8 # |
|
9 # Jack Jansen, CWI, August 1995. |
|
10 # |
|
11 # The module is supposed to be as compatible as possible. Especially the |
|
12 # easy interface should work "as expected" on any platform. |
|
13 # XXXX Note: currently, textfiles appear in mac-form on all platforms. |
|
14 # We seem to lack a simple character-translate in python. |
|
15 # (we should probably use ISO-Latin-1 on all but the mac platform). |
|
16 # XXXX The simple routines are too simple: they expect to hold the complete |
|
17 # files in-core. Should be fixed. |
|
18 # XXXX It would be nice to handle AppleDouble format on unix |
|
19 # (for servers serving macs). |
|
20 # XXXX I don't understand what happens when you get 0x90 times the same byte on |
|
21 # input. The resulting code (xx 90 90) would appear to be interpreted as an |
|
22 # escaped *value* of 0x90. All coders I've seen appear to ignore this nicety... |
|
23 # |
|
24 import sys |
|
25 import os |
|
26 import struct |
|
27 import binascii |
|
28 |
|
29 __all__ = ["binhex","hexbin","Error"] |
|
30 |
|
31 class Error(Exception): |
|
32 pass |
|
33 |
|
34 # States (what have we written) |
|
35 [_DID_HEADER, _DID_DATA, _DID_RSRC] = range(3) |
|
36 |
|
37 # Various constants |
|
38 REASONABLY_LARGE=32768 # Minimal amount we pass the rle-coder |
|
39 LINELEN=64 |
|
40 RUNCHAR=chr(0x90) # run-length introducer |
|
41 |
|
42 # |
|
43 # This code is no longer byte-order dependent |
|
44 |
|
45 # |
|
46 # Workarounds for non-mac machines. |
|
47 try: |
|
48 from Carbon.File import FSSpec, FInfo |
|
49 from MacOS import openrf |
|
50 |
|
51 def getfileinfo(name): |
|
52 finfo = FSSpec(name).FSpGetFInfo() |
|
53 dir, file = os.path.split(name) |
|
54 # XXX Get resource/data sizes |
|
55 fp = open(name, 'rb') |
|
56 fp.seek(0, 2) |
|
57 dlen = fp.tell() |
|
58 fp = openrf(name, '*rb') |
|
59 fp.seek(0, 2) |
|
60 rlen = fp.tell() |
|
61 return file, finfo, dlen, rlen |
|
62 |
|
63 def openrsrc(name, *mode): |
|
64 if not mode: |
|
65 mode = '*rb' |
|
66 else: |
|
67 mode = '*' + mode[0] |
|
68 return openrf(name, mode) |
|
69 |
|
70 except ImportError: |
|
71 # |
|
72 # Glue code for non-macintosh usage |
|
73 # |
|
74 |
|
75 class FInfo: |
|
76 def __init__(self): |
|
77 self.Type = '????' |
|
78 self.Creator = '????' |
|
79 self.Flags = 0 |
|
80 |
|
81 def getfileinfo(name): |
|
82 finfo = FInfo() |
|
83 # Quick check for textfile |
|
84 fp = open(name) |
|
85 data = open(name).read(256) |
|
86 for c in data: |
|
87 if not c.isspace() and (c<' ' or ord(c) > 0x7f): |
|
88 break |
|
89 else: |
|
90 finfo.Type = 'TEXT' |
|
91 fp.seek(0, 2) |
|
92 dsize = fp.tell() |
|
93 fp.close() |
|
94 dir, file = os.path.split(name) |
|
95 file = file.replace(':', '-', 1) |
|
96 return file, finfo, dsize, 0 |
|
97 |
|
98 class openrsrc: |
|
99 def __init__(self, *args): |
|
100 pass |
|
101 |
|
102 def read(self, *args): |
|
103 return '' |
|
104 |
|
105 def write(self, *args): |
|
106 pass |
|
107 |
|
108 def close(self): |
|
109 pass |
|
110 |
|
111 class _Hqxcoderengine: |
|
112 """Write data to the coder in 3-byte chunks""" |
|
113 |
|
114 def __init__(self, ofp): |
|
115 self.ofp = ofp |
|
116 self.data = '' |
|
117 self.hqxdata = '' |
|
118 self.linelen = LINELEN-1 |
|
119 |
|
120 def write(self, data): |
|
121 self.data = self.data + data |
|
122 datalen = len(self.data) |
|
123 todo = (datalen//3)*3 |
|
124 data = self.data[:todo] |
|
125 self.data = self.data[todo:] |
|
126 if not data: |
|
127 return |
|
128 self.hqxdata = self.hqxdata + binascii.b2a_hqx(data) |
|
129 self._flush(0) |
|
130 |
|
131 def _flush(self, force): |
|
132 first = 0 |
|
133 while first <= len(self.hqxdata)-self.linelen: |
|
134 last = first + self.linelen |
|
135 self.ofp.write(self.hqxdata[first:last]+'\n') |
|
136 self.linelen = LINELEN |
|
137 first = last |
|
138 self.hqxdata = self.hqxdata[first:] |
|
139 if force: |
|
140 self.ofp.write(self.hqxdata + ':\n') |
|
141 |
|
142 def close(self): |
|
143 if self.data: |
|
144 self.hqxdata = \ |
|
145 self.hqxdata + binascii.b2a_hqx(self.data) |
|
146 self._flush(1) |
|
147 self.ofp.close() |
|
148 del self.ofp |
|
149 |
|
150 class _Rlecoderengine: |
|
151 """Write data to the RLE-coder in suitably large chunks""" |
|
152 |
|
153 def __init__(self, ofp): |
|
154 self.ofp = ofp |
|
155 self.data = '' |
|
156 |
|
157 def write(self, data): |
|
158 self.data = self.data + data |
|
159 if len(self.data) < REASONABLY_LARGE: |
|
160 return |
|
161 rledata = binascii.rlecode_hqx(self.data) |
|
162 self.ofp.write(rledata) |
|
163 self.data = '' |
|
164 |
|
165 def close(self): |
|
166 if self.data: |
|
167 rledata = binascii.rlecode_hqx(self.data) |
|
168 self.ofp.write(rledata) |
|
169 self.ofp.close() |
|
170 del self.ofp |
|
171 |
|
172 class BinHex: |
|
173 def __init__(self, (name, finfo, dlen, rlen), ofp): |
|
174 if type(ofp) == type(''): |
|
175 ofname = ofp |
|
176 ofp = open(ofname, 'w') |
|
177 if os.name == 'mac': |
|
178 fss = FSSpec(ofname) |
|
179 fss.SetCreatorType('BnHq', 'TEXT') |
|
180 ofp.write('(This file must be converted with BinHex 4.0)\n\n:') |
|
181 hqxer = _Hqxcoderengine(ofp) |
|
182 self.ofp = _Rlecoderengine(hqxer) |
|
183 self.crc = 0 |
|
184 if finfo is None: |
|
185 finfo = FInfo() |
|
186 self.dlen = dlen |
|
187 self.rlen = rlen |
|
188 self._writeinfo(name, finfo) |
|
189 self.state = _DID_HEADER |
|
190 |
|
191 def _writeinfo(self, name, finfo): |
|
192 nl = len(name) |
|
193 if nl > 63: |
|
194 raise Error, 'Filename too long' |
|
195 d = chr(nl) + name + '\0' |
|
196 d2 = finfo.Type + finfo.Creator |
|
197 |
|
198 # Force all structs to be packed with big-endian |
|
199 d3 = struct.pack('>h', finfo.Flags) |
|
200 d4 = struct.pack('>ii', self.dlen, self.rlen) |
|
201 info = d + d2 + d3 + d4 |
|
202 self._write(info) |
|
203 self._writecrc() |
|
204 |
|
205 def _write(self, data): |
|
206 self.crc = binascii.crc_hqx(data, self.crc) |
|
207 self.ofp.write(data) |
|
208 |
|
209 def _writecrc(self): |
|
210 # XXXX Should this be here?? |
|
211 # self.crc = binascii.crc_hqx('\0\0', self.crc) |
|
212 if self.crc < 0: |
|
213 fmt = '>h' |
|
214 else: |
|
215 fmt = '>H' |
|
216 self.ofp.write(struct.pack(fmt, self.crc)) |
|
217 self.crc = 0 |
|
218 |
|
219 def write(self, data): |
|
220 if self.state != _DID_HEADER: |
|
221 raise Error, 'Writing data at the wrong time' |
|
222 self.dlen = self.dlen - len(data) |
|
223 self._write(data) |
|
224 |
|
225 def close_data(self): |
|
226 if self.dlen != 0: |
|
227 raise Error, 'Incorrect data size, diff=%r' % (self.rlen,) |
|
228 self._writecrc() |
|
229 self.state = _DID_DATA |
|
230 |
|
231 def write_rsrc(self, data): |
|
232 if self.state < _DID_DATA: |
|
233 self.close_data() |
|
234 if self.state != _DID_DATA: |
|
235 raise Error, 'Writing resource data at the wrong time' |
|
236 self.rlen = self.rlen - len(data) |
|
237 self._write(data) |
|
238 |
|
239 def close(self): |
|
240 if self.state < _DID_DATA: |
|
241 self.close_data() |
|
242 if self.state != _DID_DATA: |
|
243 raise Error, 'Close at the wrong time' |
|
244 if self.rlen != 0: |
|
245 raise Error, \ |
|
246 "Incorrect resource-datasize, diff=%r" % (self.rlen,) |
|
247 self._writecrc() |
|
248 self.ofp.close() |
|
249 self.state = None |
|
250 del self.ofp |
|
251 |
|
252 def binhex(inp, out): |
|
253 """(infilename, outfilename) - Create binhex-encoded copy of a file""" |
|
254 finfo = getfileinfo(inp) |
|
255 ofp = BinHex(finfo, out) |
|
256 |
|
257 ifp = open(inp, 'rb') |
|
258 # XXXX Do textfile translation on non-mac systems |
|
259 while 1: |
|
260 d = ifp.read(128000) |
|
261 if not d: break |
|
262 ofp.write(d) |
|
263 ofp.close_data() |
|
264 ifp.close() |
|
265 |
|
266 ifp = openrsrc(inp, 'rb') |
|
267 while 1: |
|
268 d = ifp.read(128000) |
|
269 if not d: break |
|
270 ofp.write_rsrc(d) |
|
271 ofp.close() |
|
272 ifp.close() |
|
273 |
|
274 class _Hqxdecoderengine: |
|
275 """Read data via the decoder in 4-byte chunks""" |
|
276 |
|
277 def __init__(self, ifp): |
|
278 self.ifp = ifp |
|
279 self.eof = 0 |
|
280 |
|
281 def read(self, totalwtd): |
|
282 """Read at least wtd bytes (or until EOF)""" |
|
283 decdata = '' |
|
284 wtd = totalwtd |
|
285 # |
|
286 # The loop here is convoluted, since we don't really now how |
|
287 # much to decode: there may be newlines in the incoming data. |
|
288 while wtd > 0: |
|
289 if self.eof: return decdata |
|
290 wtd = ((wtd+2)//3)*4 |
|
291 data = self.ifp.read(wtd) |
|
292 # |
|
293 # Next problem: there may not be a complete number of |
|
294 # bytes in what we pass to a2b. Solve by yet another |
|
295 # loop. |
|
296 # |
|
297 while 1: |
|
298 try: |
|
299 decdatacur, self.eof = \ |
|
300 binascii.a2b_hqx(data) |
|
301 break |
|
302 except binascii.Incomplete: |
|
303 pass |
|
304 newdata = self.ifp.read(1) |
|
305 if not newdata: |
|
306 raise Error, \ |
|
307 'Premature EOF on binhex file' |
|
308 data = data + newdata |
|
309 decdata = decdata + decdatacur |
|
310 wtd = totalwtd - len(decdata) |
|
311 if not decdata and not self.eof: |
|
312 raise Error, 'Premature EOF on binhex file' |
|
313 return decdata |
|
314 |
|
315 def close(self): |
|
316 self.ifp.close() |
|
317 |
|
318 class _Rledecoderengine: |
|
319 """Read data via the RLE-coder""" |
|
320 |
|
321 def __init__(self, ifp): |
|
322 self.ifp = ifp |
|
323 self.pre_buffer = '' |
|
324 self.post_buffer = '' |
|
325 self.eof = 0 |
|
326 |
|
327 def read(self, wtd): |
|
328 if wtd > len(self.post_buffer): |
|
329 self._fill(wtd-len(self.post_buffer)) |
|
330 rv = self.post_buffer[:wtd] |
|
331 self.post_buffer = self.post_buffer[wtd:] |
|
332 return rv |
|
333 |
|
334 def _fill(self, wtd): |
|
335 self.pre_buffer = self.pre_buffer + self.ifp.read(wtd+4) |
|
336 if self.ifp.eof: |
|
337 self.post_buffer = self.post_buffer + \ |
|
338 binascii.rledecode_hqx(self.pre_buffer) |
|
339 self.pre_buffer = '' |
|
340 return |
|
341 |
|
342 # |
|
343 # Obfuscated code ahead. We have to take care that we don't |
|
344 # end up with an orphaned RUNCHAR later on. So, we keep a couple |
|
345 # of bytes in the buffer, depending on what the end of |
|
346 # the buffer looks like: |
|
347 # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0) |
|
348 # '?\220' - Keep 2 bytes: repeated something-else |
|
349 # '\220\0' - Escaped \220: Keep 2 bytes. |
|
350 # '?\220?' - Complete repeat sequence: decode all |
|
351 # otherwise: keep 1 byte. |
|
352 # |
|
353 mark = len(self.pre_buffer) |
|
354 if self.pre_buffer[-3:] == RUNCHAR + '\0' + RUNCHAR: |
|
355 mark = mark - 3 |
|
356 elif self.pre_buffer[-1] == RUNCHAR: |
|
357 mark = mark - 2 |
|
358 elif self.pre_buffer[-2:] == RUNCHAR + '\0': |
|
359 mark = mark - 2 |
|
360 elif self.pre_buffer[-2] == RUNCHAR: |
|
361 pass # Decode all |
|
362 else: |
|
363 mark = mark - 1 |
|
364 |
|
365 self.post_buffer = self.post_buffer + \ |
|
366 binascii.rledecode_hqx(self.pre_buffer[:mark]) |
|
367 self.pre_buffer = self.pre_buffer[mark:] |
|
368 |
|
369 def close(self): |
|
370 self.ifp.close() |
|
371 |
|
372 class HexBin: |
|
373 def __init__(self, ifp): |
|
374 if type(ifp) == type(''): |
|
375 ifp = open(ifp) |
|
376 # |
|
377 # Find initial colon. |
|
378 # |
|
379 while 1: |
|
380 ch = ifp.read(1) |
|
381 if not ch: |
|
382 raise Error, "No binhex data found" |
|
383 # Cater for \r\n terminated lines (which show up as \n\r, hence |
|
384 # all lines start with \r) |
|
385 if ch == '\r': |
|
386 continue |
|
387 if ch == ':': |
|
388 break |
|
389 if ch != '\n': |
|
390 dummy = ifp.readline() |
|
391 |
|
392 hqxifp = _Hqxdecoderengine(ifp) |
|
393 self.ifp = _Rledecoderengine(hqxifp) |
|
394 self.crc = 0 |
|
395 self._readheader() |
|
396 |
|
397 def _read(self, len): |
|
398 data = self.ifp.read(len) |
|
399 self.crc = binascii.crc_hqx(data, self.crc) |
|
400 return data |
|
401 |
|
402 def _checkcrc(self): |
|
403 filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff |
|
404 #self.crc = binascii.crc_hqx('\0\0', self.crc) |
|
405 # XXXX Is this needed?? |
|
406 self.crc = self.crc & 0xffff |
|
407 if filecrc != self.crc: |
|
408 raise Error, 'CRC error, computed %x, read %x' \ |
|
409 %(self.crc, filecrc) |
|
410 self.crc = 0 |
|
411 |
|
412 def _readheader(self): |
|
413 len = self._read(1) |
|
414 fname = self._read(ord(len)) |
|
415 rest = self._read(1+4+4+2+4+4) |
|
416 self._checkcrc() |
|
417 |
|
418 type = rest[1:5] |
|
419 creator = rest[5:9] |
|
420 flags = struct.unpack('>h', rest[9:11])[0] |
|
421 self.dlen = struct.unpack('>l', rest[11:15])[0] |
|
422 self.rlen = struct.unpack('>l', rest[15:19])[0] |
|
423 |
|
424 self.FName = fname |
|
425 self.FInfo = FInfo() |
|
426 self.FInfo.Creator = creator |
|
427 self.FInfo.Type = type |
|
428 self.FInfo.Flags = flags |
|
429 |
|
430 self.state = _DID_HEADER |
|
431 |
|
432 def read(self, *n): |
|
433 if self.state != _DID_HEADER: |
|
434 raise Error, 'Read data at wrong time' |
|
435 if n: |
|
436 n = n[0] |
|
437 n = min(n, self.dlen) |
|
438 else: |
|
439 n = self.dlen |
|
440 rv = '' |
|
441 while len(rv) < n: |
|
442 rv = rv + self._read(n-len(rv)) |
|
443 self.dlen = self.dlen - n |
|
444 return rv |
|
445 |
|
446 def close_data(self): |
|
447 if self.state != _DID_HEADER: |
|
448 raise Error, 'close_data at wrong time' |
|
449 if self.dlen: |
|
450 dummy = self._read(self.dlen) |
|
451 self._checkcrc() |
|
452 self.state = _DID_DATA |
|
453 |
|
454 def read_rsrc(self, *n): |
|
455 if self.state == _DID_HEADER: |
|
456 self.close_data() |
|
457 if self.state != _DID_DATA: |
|
458 raise Error, 'Read resource data at wrong time' |
|
459 if n: |
|
460 n = n[0] |
|
461 n = min(n, self.rlen) |
|
462 else: |
|
463 n = self.rlen |
|
464 self.rlen = self.rlen - n |
|
465 return self._read(n) |
|
466 |
|
467 def close(self): |
|
468 if self.rlen: |
|
469 dummy = self.read_rsrc(self.rlen) |
|
470 self._checkcrc() |
|
471 self.state = _DID_RSRC |
|
472 self.ifp.close() |
|
473 |
|
474 def hexbin(inp, out): |
|
475 """(infilename, outfilename) - Decode binhexed file""" |
|
476 ifp = HexBin(inp) |
|
477 finfo = ifp.FInfo |
|
478 if not out: |
|
479 out = ifp.FName |
|
480 if os.name == 'mac': |
|
481 ofss = FSSpec(out) |
|
482 out = ofss.as_pathname() |
|
483 |
|
484 ofp = open(out, 'wb') |
|
485 # XXXX Do translation on non-mac systems |
|
486 while 1: |
|
487 d = ifp.read(128000) |
|
488 if not d: break |
|
489 ofp.write(d) |
|
490 ofp.close() |
|
491 ifp.close_data() |
|
492 |
|
493 d = ifp.read_rsrc(128000) |
|
494 if d: |
|
495 ofp = openrsrc(out, 'wb') |
|
496 ofp.write(d) |
|
497 while 1: |
|
498 d = ifp.read_rsrc(128000) |
|
499 if not d: break |
|
500 ofp.write(d) |
|
501 ofp.close() |
|
502 |
|
503 if os.name == 'mac': |
|
504 nfinfo = ofss.GetFInfo() |
|
505 nfinfo.Creator = finfo.Creator |
|
506 nfinfo.Type = finfo.Type |
|
507 nfinfo.Flags = finfo.Flags |
|
508 ofss.SetFInfo(nfinfo) |
|
509 |
|
510 ifp.close() |
|
511 |
|
512 def _test(): |
|
513 fname = sys.argv[1] |
|
514 binhex(fname, fname+'.hqx') |
|
515 hexbin(fname+'.hqx', fname+'.viahqx') |
|
516 #hexbin(fname, fname+'.unpacked') |
|
517 sys.exit(1) |
|
518 |
|
519 if __name__ == '__main__': |
|
520 _test() |