added adf support via pyadf

This commit is contained in:
Christian Vogelgsang 2011-03-16 19:30:17 +01:00
parent 16bc14acf0
commit 46e1964a1e
5 changed files with 5367 additions and 378 deletions

27
amitools/ADFScanner.py Normal file
View File

@ -0,0 +1,27 @@
"""Scan an ADF image an visit all files"""
from pyadf import Adf, AdfIOException
ST_DIR = 2
ST_FILE = -3
def scan_dir(file_name, visit_func, dir_path, adf):
entries = adf.ls_dir(dir_path)
for entry in entries:
name = entry.fname
if dir_path == "":
path = name
else:
path = dir_path + "/" + name
if entry.ftype == ST_DIR:
scan_dir(file_name, visit_func, path, adf)
elif entry.ftype == ST_FILE:
try:
data = adf.get_file(path)
visit_func(file_name, path, data)
except AdfIOException, info:
pass
def scan_adf(file_name, visit_func):
adf = Adf(file_name, mode='r')
scan_dir(file_name, visit_func, "", adf)

View File

@ -1,6 +1,7 @@
"""A class for reading and writing Amiga executables in Hunk format"""
import struct
import StringIO
HUNK_MIN = 999
HUNK_MAX = 1022
@ -158,392 +159,407 @@ class HunkFile:
else:
return strtab[offset:end]
def is_valid_first_hunk_type(self, hunk_type):
return hunk_type == HUNK_HEADER or hunk_type == HUNK_LIB or hunk_type == HUNK_UNIT \
or hunk_type == HUNK_CODE # strange?
# ----- public functions -----
"""Read a hunk file and build internal hunk structure
Return status and set self.error_string on failure
"""
def read_file(self, hfile):
self.hunks = []
first_hunk = True
was_end = False
with open(hfile) as f:
while True:
hunk_raw_type = self.read_long(f)
if hunk_raw_type == -1:
# eof
break
elif hunk_raw_type < 0:
if first_hunk:
self.error_string = "No valid hunk file: '%s' is too short" % (hfile)
return RESULT_NO_HUNK_FILE
else:
self.error_string = "Error reading hunk type @%08x" % (f.tell())
return RESULT_INVALID_HUNK_FILE
hunk_type = hunk_raw_type & HUNK_TYPE_MASK
hunk_flags = hunk_raw_type & HUNK_FLAGS_MASK
# check range of hunk type
if hunk_type < HUNK_MIN or hunk_type > HUNK_MAX:
# no hunk file?
if first_hunk:
self.error_string = "No valid hunk file: '%s' type was %d" % (hfile, hunk_type)
return RESULT_NO_HUNK_FILE
elif was_end:
# garbage after an end tag is ignored
return RESULT_OK
else:
self.error_string = "Invalid hunk type %d found at @%08x" % (hunk_type,f.tell())
return RESULT_INVALID_HUNK_FILE
return self.read_file_obj(hfile, f)
"""Read a hunk from memory"""
def read_mem(self, name, data):
fobj = StringIO.StringIO(data)
return self.read_file_obj(name, fobj)
def read_file_obj(self, hfile, f):
self.hunks = []
is_first_hunk = True
was_end = False
while True:
# read hunk type
hunk_raw_type = self.read_long(f)
if hunk_raw_type == -1:
# eof
break
elif hunk_raw_type < 0:
if is_first_hunk:
self.error_string = "No valid hunk file: '%s' is too short" % (hfile)
return RESULT_NO_HUNK_FILE
else:
# check for valid first hunk type
if first_hunk:
if hunk_type != HUNK_HEADER:
self.error_string = "No valid hunk file: '%s' first hunk type was %d" % (hfile, hunk_type)
return RESULT_NO_HUNK_FILE
first_hunk = False
was_end = False
hunk = { 'type' : hunk_type }
self.hunks.append(hunk)
hunk['type_name'] = hunk_names[hunk_type - HUNK_MIN]
# set hunk flags
hunk['flags'] = hunk_flags
self.error_string = "Error reading hunk type @%08x" % (f.tell())
return RESULT_INVALID_HUNK_FILE
hunk_type = hunk_raw_type & HUNK_TYPE_MASK
hunk_flags = hunk_raw_type & HUNK_FLAGS_MASK
# check range of hunk type
if hunk_type < HUNK_MIN or hunk_type > HUNK_MAX:
# no hunk file?
if is_first_hunk:
self.error_string = "No valid hunk file: '%s' type was %d" % (hfile, hunk_type)
return RESULT_NO_HUNK_FILE
elif was_end:
# garbage after an end tag is ignored
return RESULT_OK
else:
self.error_string = "Invalid hunk type %d found at @%08x" % (hunk_type,f.tell())
return RESULT_INVALID_HUNK_FILE
else:
# check for valid first hunk type
if is_first_hunk and not self.is_valid_first_hunk_type(hunk_type):
self.error_string = "No valid hunk file: '%s' first hunk type was %d" % (hfile, hunk_type)
return RESULT_INVALID_HUNK_FILE
is_first_hunk = False
was_end = False
hunk = { 'type' : hunk_type }
self.hunks.append(hunk)
hunk['type_name'] = hunk_names[hunk_type - HUNK_MIN]
# set hunk flags
hunk['flags'] = hunk_flags
# V37 fix
if self.v37_compat and hunk_type == HUNK_DREL32:
hunk_type = HUNK_RELOC32SHORT
# V37 fix
if self.v37_compat and hunk_type == HUNK_DREL32:
hunk_type = HUNK_RELOC32SHORT
# ----- HUNK_HEADER -----
if hunk_type == HUNK_HEADER:
names = []
while True:
l,s = self.read_name(f)
if l < 0:
self.error_string = "Error parsing HUNK_HEADER names"
return RESULT_INVALID_HUNK_FILE
elif l == 0:
break
names.append(s)
hunk['names'] = names
# ----- HUNK_HEADER -----
if hunk_type == HUNK_HEADER:
names = []
while True:
l,s = self.read_name(f)
if l < 0:
self.error_string = "Error parsing HUNK_HEADER names"
return RESULT_INVALID_HUNK_FILE
elif l == 0:
break
names.append(s)
hunk['names'] = names
# table size and hunk range
table_size = self.read_long(f)
first_hunk = self.read_long(f)
last_hunk = self.read_long(f)
if table_size < 0 or first_hunk < 0 or last_hunk < 0:
self.error_string = "HUNK_HEADER invalid table_size or first_hunk or last_hunk"
# table size and hunk range
table_size = self.read_long(f)
first_hunk = self.read_long(f)
last_hunk = self.read_long(f)
if table_size < 0 or first_hunk < 0 or last_hunk < 0:
self.error_string = "HUNK_HEADER invalid table_size or first_hunk or last_hunk"
return RESULT_INVALID_HUNK_FILE
hunk['table_size'] = table_size
hunk['first_hunk'] = first_hunk
hunk['last_hunk'] = last_hunk
# determine number of hunks in size table
num_hunks = last_hunk - first_hunk + 1
hunk_table = []
for a in xrange(num_hunks):
hunk_info = {}
hunk_size = self.read_long(f)
if hunk_size < 0:
self.error_string = "HUNK_HEADER contains invalid hunk_size"
return RESULT_INVALID_HUNK_FILE
hunk['table_size'] = table_size
hunk['first_hunk'] = first_hunk
hunk['last_hunk'] = last_hunk
hunk_info['size'] = hunk_size & ~HUNKF_ALL
hunk_info['flags'] = hunk_size & HUNKF_ALL
hunk_table.append(hunk_info)
hunk['hunks'] = hunk_table
# determine number of hunks in size table
num_hunks = last_hunk - first_hunk + 1
hunk_table = []
# ----- HUNK_CODE/HUNK_DATA ------
elif hunk_type == HUNK_CODE or hunk_type == HUNK_DATA:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# read in hunk data
size = num_longs * 4
hunk['size'] = size & ~HUNKF_ALL
hunk['flags'] = size & HUNKF_ALL | hunk['flags']
hunk['file_offset'] = f.tell()
data = f.read(hunk['size'])
#hunk['data'] = data
# ---- HUNK_BSS ----
elif hunk_type == HUNK_BSS:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# read in hunk data
size = num_longs * 4
hunk['size'] = size & ~HUNKF_ALL
hunk['flags'] = size & HUNKF_ALL | hunk['flags']
# ----- HUNK_<reloc> -----
elif hunk_type == HUNK_RELRELOC32 or hunk_type == HUNK_ABSRELOC16 \
or hunk_type == HUNK_RELRELOC8 or hunk_type == HUNK_RELRELOC16 or hunk_type == HUNK_ABSRELOC32 \
or hunk_type ==HUNK_DREL32 or hunk_type == HUNK_DREL16 or hunk_type == HUNK_DREL8:
num_relocs = 1
reloc = {}
hunk['reloc'] = reloc
while num_relocs != 0:
num_relocs = self.read_long(f)
if num_relocs < 0:
self.error_string = "%s has invalid number of relocations" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif num_relocs == 0:
# last relocation found
break
# build reloc map
hunk_num = self.read_long(f)
if hunk_num < 0:
self.error_string = "%s has invalid hunk num" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
offsets = []
for a in xrange(num_relocs & 0xffff):
offset = self.read_long(f)
if offset < 0:
self.error_string = "%s has invalid relocation #%d offset %d (num_relocs=%d hunk_num=%d, offset=%d)" \
% (hunk['type_name'],a,offset,num_relocs,hunk_num,f.tell())
return RESULT_INVALID_HUNK_FILE
offsets.append(offset)
reloc[hunk_num] = offsets
# ---- HUNK_<reloc short> -----
elif hunk_type == HUNK_RELOC32SHORT:
num_relocs = 1
reloc = {}
hunk['reloc'] = reloc
total_words = 0
while num_relocs != 0:
num_relocs = self.read_word(f)
if num_relocs < 0:
self.error_string = "%s has invalid number of relocations" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif num_relocs == 0:
# last relocation found
total_words += 1
break
# build reloc map
hunk_num = self.read_word(f)
if hunk_num < 0:
self.error_string = "%s has invalid hunk num" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
offsets = []
count = num_relocs & 0xffff
total_words += count + 2
for a in xrange(count):
offset = self.read_word(f)
if offset < 0:
self.error_string = "%s has invalid relocation #%d offset %d (num_relocs=%d hunk_num=%d, offset=%d)" \
% (hunk['type_name'],a,offset,num_relocs,hunk_num,f.tell())
return RESULT_INVALID_HUNK_FILE
offsets.append(offset)
reloc[hunk_num] = offsets
# padding
if total_words & 1 == 1:
self.read_word(f)
# ----- HUNK_SYMBOL -----
elif hunk_type == HUNK_SYMBOL:
name_len = 1
symbols = []
while name_len > 0:
(name_len, name) = self.read_name(f)
if name_len < 0:
self.error_string = "%s has invalid symbol name" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif name_len == 0:
# last name occurred
break
value = self.read_long(f)
if value < 0:
self.error_string = "%s has invalid symbol vlaue" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
symbols.append( (name,value) )
hunk['symbols'] = symbols
# ----- HUNK_DEBUG -----
elif hunk_type == HUNK_DEBUG:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
size = num_longs * 4
hunk['data'] = f.read(size)
# ----- HUNK_END -----
elif hunk_type == HUNK_END:
was_end = True
# ----- HUNK_OVERLAY -----
elif hunk_type == HUNK_OVERLAY:
tab_size = self.read_long(f)
if tab_size < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
tab_size += 1
size = tab_size * 4
data = f.read(size)
# ----- HUNK_BREAK -----
elif hunk_type == HUNK_BREAK:
pass
# ----- HUNK_LIB -----
elif hunk_type == HUNK_LIB:
lib_size = self.read_long(f)
# TODO: mark the embedded hunk structure
# ----- HUNK_INDEX -----
elif hunk_type == HUNK_INDEX:
index_size = self.read_long(f)
total_size = index_size * 4
# first read string table
strtab_size = self.read_word(f)
strtab = f.read(strtab_size)
total_size -= strtab_size
# read units
units = []
hunk['units'] = units
while total_size > 2:
# read name of unit
name_offset = self.read_word(f)
total_size -= 2
if name_offset == 0:
break
unit = {}
units.append(unit)
# generate unit name
unit['name'] = self.get_index_name(strtab, name_offset)
# hunks in unit
hunk_begin = self.read_word(f)
num_hunks = self.read_word(f)
total_size -= 4
unit['hunk_begin'] = hunk_begin
unit['num_hunks'] = num_hunks
# for all hunks in unit
ihunks = []
unit['hunks'] = ihunks
for a in xrange(num_hunks):
hunk_info = {}
hunk_size = self.read_long(f)
if hunk_size < 0:
self.error_string = "HUNK_HEADER contains invalid hunk_size"
return RESULT_INVALID_HUNK_FILE
ihunk = {}
ihunks.append(ihunk)
hunk_info['size'] = hunk_size & ~HUNKF_ALL
hunk_info['flags'] = hunk_size & HUNKF_ALL
hunk_table.append(hunk_info)
hunk['hunks'] = hunk_table
# ----- HUNK_CODE/HUNK_DATA ------
elif hunk_type == HUNK_CODE or hunk_type == HUNK_DATA:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# read in hunk data
size = num_longs * 4
hunk['size'] = size & ~HUNKF_ALL
hunk['flags'] = size & HUNKF_ALL | hunk['flags']
hunk['file_offset'] = f.tell()
data = f.read(hunk['size'])
#hunk['data'] = data
# ---- HUNK_BSS ----
elif hunk_type == HUNK_BSS:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# read in hunk data
size = num_longs * 4
hunk['size'] = size & ~HUNKF_ALL
hunk['flags'] = size & HUNKF_ALL | hunk['flags']
# ----- HUNK_<reloc> -----
elif hunk_type == HUNK_RELRELOC32 or hunk_type == HUNK_ABSRELOC16 \
or hunk_type == HUNK_RELRELOC8 or hunk_type == HUNK_RELRELOC16 or hunk_type == HUNK_ABSRELOC32 \
or hunk_type ==HUNK_DREL32 or hunk_type == HUNK_DREL16 or hunk_type == HUNK_DREL8:
num_relocs = 1
reloc = {}
hunk['reloc'] = reloc
while num_relocs != 0:
num_relocs = self.read_long(f)
if num_relocs < 0:
self.error_string = "%s has invalid number of relocations" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif num_relocs == 0:
# last relocation found
break
# build reloc map
hunk_num = self.read_long(f)
if hunk_num < 0:
self.error_string = "%s has invalid hunk num" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
offsets = []
for a in xrange(num_relocs & 0xffff):
offset = self.read_long(f)
if offset < 0:
self.error_string = "%s has invalid relocation #%d offset %d (num_relocs=%d hunk_num=%d, offset=%d)" \
% (hunk['type_name'],a,offset,num_relocs,hunk_num,f.tell())
return RESULT_INVALID_HUNK_FILE
offsets.append(offset)
reloc[hunk_num] = offsets
# ---- HUNK_<reloc short> -----
elif hunk_type == HUNK_RELOC32SHORT:
num_relocs = 1
reloc = {}
hunk['reloc'] = reloc
total_words = 0
while num_relocs != 0:
num_relocs = self.read_word(f)
if num_relocs < 0:
self.error_string = "%s has invalid number of relocations" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif num_relocs == 0:
# last relocation found
total_words += 1
break
# build reloc map
hunk_num = self.read_word(f)
if hunk_num < 0:
self.error_string = "%s has invalid hunk num" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
offsets = []
count = num_relocs & 0xffff
total_words += count + 2
for a in xrange(count):
offset = self.read_word(f)
if offset < 0:
self.error_string = "%s has invalid relocation #%d offset %d (num_relocs=%d hunk_num=%d, offset=%d)" \
% (hunk['type_name'],a,offset,num_relocs,hunk_num,f.tell())
return RESULT_INVALID_HUNK_FILE
offsets.append(offset)
reloc[hunk_num] = offsets
# padding
if total_words & 1 == 1:
self.read_word(f)
# ----- HUNK_SYMBOL -----
elif hunk_type == HUNK_SYMBOL:
name_len = 1
symbols = []
while name_len > 0:
(name_len, name) = self.read_name(f)
if name_len < 0:
self.error_string = "%s has invalid symbol name" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif name_len == 0:
# last name occurred
break
value = self.read_long(f)
if value < 0:
self.error_string = "%s has invalid symbol vlaue" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
symbols.append( (name,value) )
hunk['symbols'] = symbols
# ----- HUNK_DEBUG -----
elif hunk_type == HUNK_DEBUG:
num_longs = self.read_long(f)
if num_longs < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
size = num_longs * 4
hunk['data'] = f.read(size)
# ----- HUNK_END -----
elif hunk_type == HUNK_END:
was_end = True
# ----- HUNK_OVERLAY -----
elif hunk_type == HUNK_OVERLAY:
tab_size = self.read_long(f)
if tab_size < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
tab_size += 1
size = tab_size * 4
data = f.read(size)
# ----- HUNK_BREAK -----
elif hunk_type == HUNK_BREAK:
pass
# ----- HUNK_LIB -----
elif hunk_type == HUNK_LIB:
lib_size = self.read_long(f)
# TODO: mark the embedded hunk structure
# ----- HUNK_INDEX -----
elif hunk_type == HUNK_INDEX:
index_size = self.read_long(f)
total_size = index_size * 4
# first read string table
strtab_size = self.read_word(f)
strtab = f.read(strtab_size)
total_size -= strtab_size
# read units
units = []
hunk['units'] = units
while total_size > 2:
# read name of unit
# get hunk info
name_offset = self.read_word(f)
hunk_size = self.read_word(f)
hunk_type = self.read_word(f)
total_size -= 6
ihunk['name'] = self.get_index_name(strtab, name_offset)
# get number of references
num_refs = self.read_word(f)
total_size -= 2
if name_offset == 0:
break
unit = {}
units.append(unit)
# generate unit name
unit['name'] = self.get_index_name(strtab, name_offset)
# hunks in unit
hunk_begin = self.read_word(f)
num_hunks = self.read_word(f)
total_size -= 4
unit['hunk_begin'] = hunk_begin
unit['num_hunks'] = num_hunks
# for all hunks in unit
ihunks = []
unit['hunks'] = ihunks
for a in xrange(num_hunks):
ihunk = {}
ihunks.append(ihunk)
# get hunk info
name_offset = self.read_word(f)
hunk_size = self.read_word(f)
hunk_type = self.read_word(f)
total_size -= 6
ihunk['name'] = self.get_index_name(strtab, name_offset)
# get number of references
num_refs = self.read_word(f)
total_size -= 2
if num_refs > 0:
refs = []
ihunk['refs'] = refs
for b in xrange(num_refs):
name_offset = self.read_word(f)
total_size -= 2
name = self.get_index_name(strtab, name_offset)
refs.append(name)
num_defs = self.read_word(f)
total_size -= 2
if num_defs > 0:
defs = []
ihunk['defs'] = defs
for b in xrange(num_defs):
name_offset = self.read_word(f)
def_value = self.read_word(f)
def_type = self.read_word(f)
total_size -= 6
name = self.get_index_name(strtab, name_offset)
d = { 'name':name, 'value':def_value,'type':def_type}
defs.append(d)
# align hunk
if total_size == 2:
self.read_word(f)
elif total_size != 0:
self.error_string = "%s has invalid padding" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# ----- HUNK_EXT -----
elif hunk_type == HUNK_EXT:
exts = []
hunk['exts'] = exts
ext_type_size = 1
while ext_type_size > 0:
ext_type_size = self.read_long(f)
if ext_type_size < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
ext_type = ext_type_size >> EXT_TYPE_SHIFT
ext_size = ext_type_size & EXT_TYPE_SIZE_MASK
l,ext_name = self.read_name_size(f, ext_size)
if l < 0:
self.error_string = "%s has invalid name" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif l == 0:
break
# create local ext object
ext = { 'type' : ext_type, 'name' : ext_name }
exts.append(ext)
# check and setup type name
if not ext_names.has_key(ext_type):
self.error_string = "%s has unspported ext entry %d" % (hunk['type_name'],ext_type)
return RESULT_INVALID_HUNK_FILE
ext['type_name'] = ext_names[ext_type]
# read parameters of type
if ext_type == EXT_ABSCOMMON or ext_type == EXT_RELCOMMON:
ext['common_size'] = self.read_long(f)
elif ext_type == EXT_DEF or ext_type == EXT_ABS or ext_type == EXT_RES:
ext['value'] = self.read_long(f)
else:
num_refs = self.read_long(f)
if num_refs == 0:
num_refs = 1
if num_refs > 0:
refs = []
for a in xrange(num_refs):
ref = self.read_long(f)
refs.append(ref)
ext['refs'] = refs
ihunk['refs'] = refs
for b in xrange(num_refs):
name_offset = self.read_word(f)
total_size -= 2
name = self.get_index_name(strtab, name_offset)
refs.append(name)
num_defs = self.read_word(f)
total_size -= 2
if num_defs > 0:
defs = []
ihunk['defs'] = defs
for b in xrange(num_defs):
name_offset = self.read_word(f)
def_value = self.read_word(f)
def_type = self.read_word(f)
total_size -= 6
name = self.get_index_name(strtab, name_offset)
d = { 'name':name, 'value':def_value,'type':def_type}
defs.append(d)
# align hunk
if total_size == 2:
self.read_word(f)
elif total_size != 0:
self.error_string = "%s has invalid padding" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
# ----- HUNK_EXT -----
elif hunk_type == HUNK_EXT:
exts = []
hunk['exts'] = exts
ext_type_size = 1
while ext_type_size > 0:
ext_type_size = self.read_long(f)
if ext_type_size < 0:
self.error_string = "%s has invalid size" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
ext_type = ext_type_size >> EXT_TYPE_SHIFT
ext_size = ext_type_size & EXT_TYPE_SIZE_MASK
l,ext_name = self.read_name_size(f, ext_size)
if l < 0:
self.error_string = "%s has invalid name" % (hunk['type_name'])
return RESULT_INVALID_HUNK_FILE
elif l == 0:
break
# create local ext object
ext = { 'type' : ext_type, 'name' : ext_name }
exts.append(ext)
# ----- HUNK_UNIT -----
elif hunk_type == HUNK_UNIT:
l,n = self.read_name(f)
hunk['name'] = n
# check and setup type name
if not ext_names.has_key(ext_type):
self.error_string = "%s has unspported ext entry %d" % (hunk['type_name'],ext_type)
return RESULT_INVALID_HUNK_FILE
ext['type_name'] = ext_names[ext_type]
# read parameters of type
if ext_type == EXT_ABSCOMMON or ext_type == EXT_RELCOMMON:
ext['common_size'] = self.read_long(f)
elif ext_type == EXT_DEF or ext_type == EXT_ABS or ext_type == EXT_RES:
ext['value'] = self.read_long(f)
else:
num_refs = self.read_long(f)
if num_refs == 0:
num_refs = 1
refs = []
for a in xrange(num_refs):
ref = self.read_long(f)
refs.append(ref)
ext['refs'] = refs
# ----- HUNK_NAME -----
elif hunk_type == HUNK_NAME:
l,n = self.read_name(f)
hunk['name'] = n
# ----- oops! unsupported hunk -----
else:
self.error_string = "unsupported hunk %d" % (hunk_type)
return RESULT_UNSUPPORTED_HUNKS
# ----- HUNK_UNIT -----
elif hunk_type == HUNK_UNIT:
l,n = self.read_name(f)
hunk['name'] = n
# ----- HUNK_NAME -----
elif hunk_type == HUNK_NAME:
l,n = self.read_name(f)
hunk['name'] = n
# ----- oops! unsupported hunk -----
else:
self.error_string = "unsupported hunk %d" % (hunk_type)
return RESULT_UNSUPPORTED_HUNKS
return RESULT_OK

View File

@ -10,7 +10,8 @@ import os, sys
import argparse
import pprint
import amitools.Hunk as Hunk
from amitools import Hunk
from amitools import ADFScanner
def show_hunks_brief(hunks):
for hunk in hunks:
@ -22,28 +23,45 @@ def dump_hunks(hunks):
# ----- handle paths -----
def handle_file(path):
global args
hf = Hunk.HunkFile()
result = hf.read_file(path)
def check_file(path, hunk_file, result):
if result == Hunk.RESULT_OK:
print path,"OK"
if args.dump:
dump_hunks(hf.hunks)
dump_hunks(hunk_file.hunks)
elif result == Hunk.RESULT_NO_HUNK_FILE:
#print "No:",path,hf.error_string
#print "No:",path,hunk_file.error_string
pass
elif result == Hunk.RESULT_INVALID_HUNK_FILE:
print path,"Invalid:",hf.error_string
print path,"Invalid:",hunk_file.error_string
if args.dump:
dump_hunks(hf.hunks)
dump_hunks(hunk_file.hunks)
sys.exit(1)
elif result == Hunk.RESULT_UNSUPPORTED_HUNKS:
print path,"Unsupported:",hf.error_string
print path,"Unsupported:",hunk_file.error_string
if args.dump:
dump_hunks(hf.hunks)
dump_hunks(hunk_file.hunks)
sys.exit(1)
def handle_adf_file(img_path,file_path,data):
vpath = img_path + ":" + file_path
hf = Hunk.HunkFile()
result = hf.read_mem(vpath, data)
check_file(vpath, hf, result)
def handle_adf(path):
ADFScanner.scan_adf(path, handle_adf_file)
def handle_file(path):
global args
if path.endswith(".adf"):
handle_adf(path)
return
hf = Hunk.HunkFile()
result = hf.read_file(path)
check_file(path, hf, result)
def handle_dir(path):
for root, dirs, files in os.walk(path):
for name in files:

381
pyadf/__init__.py Normal file
View File

@ -0,0 +1,381 @@
#!/bin/env python
# -*- coding: ascii -*-
# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab
#
import os
import ctypes
import adflib
class AdfBaseException(Exception):
'''Base ADF I/O exception'''
class AdfIOException(AdfBaseException):
'''I/O exception'''
class AdfVersionInfo(object):
"""There are 2 kinds of version information available:
* what verson of ADFLib was adflib.py created with?
* what verson of adflib.dll or adf.so (etc.) are we running with?
"""
def __init__(self):
# what was adflib.py "created" with?
self.headers_version = adflib.ADFLIB_VERSION
self.headers_date = adflib.ADFLIB_DATE
# what version of adflib are we running with?
self.lib_version = adflib.adfGetVersionNumber()
self.lib_date = adflib.adfGetVersionDate()
def __repr__(self):
return repr(self.__dict__)
class _InternalAdfEnv(object):
"""whislt this is NOT a singleton, only create one.
I.e. singleton is not yet enforced
Still naive but doesn't allocate environment unless adf file is opened
clean up happens at (modue) desconstruction time when single_internaladfenv is destroyed.
Should allow multiple adf's to be opened
"""
def __init__(self):
self.adflib_init = False
self.counter = 0
def acquire(self):
# simulate a Semaphore, only one in python lib is in threading module
self.counter += 1
if not self.adflib_init:
adflib.adfEnvInitDefault()
self.adflib_init = True
def release(self):
# do nothing for now, let deconstructor handle this
# may not work with Jython.
self.counter -= 1
pass
def __del__(self):
if not self.adflib_init:
adflib.adfEnvCleanUp()
self.adflib_init = False
_single_internaladfenv = _InternalAdfEnv()
def adf_setup():
_single_internaladfenv.acquire()
def adf_cleanup():
_single_internaladfenv.release()
class AdfFileInfo(object):
"""Right now this is basically an expensive named tuple
"""
def __init__(self, ftype, fsize, fdate, fpath, fname, fcomment=None, fsector=None):
"""fdate is a tuple (year, month, days, hour, mins, secs)
"""
self.ftype = ftype
self.fsize = fsize
self.fdate = fdate
self.fpath = fpath
self.fsector = fsector
self.fname = fname
self.fcomment = fcomment
def __repr__(self):
return repr(self.__dict__)
def pretty_str(self):
"""return pretty output for the file entry"""
date_str = "%4d/%02d/%02d %2d:%02d:%02d" % self.fdate
if self.fpath:
name_str = "%s/"%(self.fpath,)
else:
name_str = ""
if self.ftype == adflib.ST_DIR:
name_str += "%s/"%(self.fname,)
size_str = ' ' * 7
else:
name_str += "%s"%(self.fname,)
size_str = "%7d" %(self.fsize,)
return date_str + ' ' + size_str + ' ' + name_str
def process_entry(vol_ptr, entry_ptr, file_path):
"""Simple print file information (uses print) output goes to stdout
"""
vol = vol_ptr[0]
entry = entry_ptr[0]
# do not print the links entries, ADFlib do not support them yet properly
if entry.type==adflib.ST_LFILE or entry.type==adflib.ST_LDIR or entry.type==adflib.ST_LSOFT:
return
tmp_comment = None
if entry.comment and len(entry.comment)>0:
tmp_comment = str(entry.comment)
#print type(entry.type), type(entry.size), type(entry.year), type(entry.month), type(entry.days), type(entry.hour), type(entry.mins), type(entry.secs), type(file_path), type(entry.name), type(tmp_comment), type(entry.sector)
result = AdfFileInfo(entry.type, entry.size, (entry.year, entry.month, entry.days, entry.hour, entry.mins, entry.secs), file_path, str(entry.name), tmp_comment, entry.sector)
return result
class Adf(object):
"""A pythonic api wrapper around ADFlib
"""
def __init__(self, adf_filename, volnum=0, mode='r'):
self.adf_filename = adf_filename
self.env = None
self.vol = None
self.dev = None
self.volnum = volnum
self.readonly_mode = True
if mode == 'w':
self.readonly_mode = False
self._curdir = '/'
if not os.path.exists(adf_filename):
raise AdfIOException('%s filename does not exist' % adf_filename)
self.open() # kinda nasty doing work in the constructor....
def normpath(self, filepath):
filepath = filepath.replace('\\', '/') # Allow Windows style paths (just in case they slip in)
filepath = filepath.replace(':', '/') # Allow Amiga style paths (just in case they slip in)
return filepath
def splitpath(self, filepath):
filepath = self.normpath(filepath)
result = []
for tmpdir in filepath.split('/'):
if tmpdir:
result.append(tmpdir)
return result
def chdir(self, dirname, ignore_error=False):
vol = self.vol
return self._chdir(dirname, ignore_error=ignore_error)
def _chdir(self, dirname, update_curdir=True, ignore_error=False):
vol = self.vol
dirname = self.normpath(dirname)
if dirname.startswith('/'):
adflib.adfToRootDir(vol)
if update_curdir:
self._curdir = '/'
if dirname == '/':
return True
else:
for tmpdir in dirname.split('/'):
if tmpdir:
result = adflib.adfChangeDir(vol, tmpdir)
if result == -1:
# FAIL
if ignore_error:
return False
else:
raise AdfIOException('error changing directory to %s in %s' % (tmpdir, dirname))
else:
if update_curdir:
self._curdir += dirname+'/'
return True
def ls_dir(self, dirname=None):
vol = self.vol
if dirname:
self._chdir(dirname, update_curdir=False)
else:
self._chdir(self._curdir)
result = []
Entry_Ptr = adflib.POINTER(adflib.Entry)
list = adflib.adfGetDirEnt(vol, vol[0].curDirPtr)
cell = list
while cell:
tmp_content = cell[0].content
tmp_content = adflib.cast(tmp_content, Entry_Ptr)
fentry = process_entry(vol, tmp_content, "")
if fentry:
result.append(fentry)
cell = cell[0].next
adflib.adfFreeDirList(list)
if dirname:
self._chdir(self._curdir)
return result
def get_file(self, filename):
"""return Python string which is the file contents.
NOTE/FIXME filename needs to be a file in the current directory at the moment.
FIXME if a directory name is passed in need to fail!
"""
vol = self.vol
filename = self.normpath(filename)
splitpaths = self.splitpath(filename)
if len(splitpaths) >1:
#if filename.startswith('/'):
# trim leading slash
#filename = filename[1:]
filename = splitpaths[-1]
tmp_dirname = splitpaths[:-1]
tmp_dirname= '/'.join(tmp_dirname)
self._chdir(tmp_dirname, update_curdir=False, ignore_error=False)
file_in_adf = adflib.adfOpenFile(vol, filename, "r");
if not file_in_adf:
# file probably not there
self._chdir(self._curdir)
raise AdfIOException('unable to filename %s for read' % filename)
return
#print 'adffile', repr(file_in_adf)
#print 'type adffile', type(file_in_adf)
#print 'dir adffile', dir(file_in_adf)
#print 'adffile[0]', repr(file_in_adf[0])
#adffile = adffile[0]
tmp_buffer_size = 1024*8
mybuff_type = ctypes.c_ubyte * tmp_buffer_size
mybuff = mybuff_type() ## probably a better way than this
#mybuff_ptr = ctypes.pointer(mybuff)
eof_yet = adflib.adfEndOfFile(file_in_adf)
#print 'eof_yet ', eof_yet
#print 'eof_yet ', type(eof_yet )
tmp_str = []
while not eof_yet:
n = adflib.adfReadFile(file_in_adf, tmp_buffer_size, ctypes.byref(mybuff))
eof_yet = adflib.adfEndOfFile(file_in_adf)
#print 'eof_yet ', eof_yet
#print 'eof_yet ', type(eof_yet )
#print 'n', n
#print 'mybuff', mybuff
#print 'mybuff', dir(mybuff)
#print 'mybuff', str(mybuff)
# FIXME performance of this is poor
for x in mybuff[:n]:
tmp_str.append(chr(x))
#print 'tmp_str', tmp_str
#print 'len tmp_str', len(tmp_str)
adflib.adfCloseFile(file_in_adf)
self._chdir(self._curdir)
return ''.join(tmp_str)
def push_file(self, filename, file_contents):
"""writes file_contents into filename
NOTE/FIXME filename needs to be a file in the current directory at the moment.
FIXME if a directory name is passed in need to fail!
"""
vol = self.vol
filename = self.normpath(filename)
splitpaths = self.splitpath(filename)
if len(splitpaths) >1:
filename = splitpaths[-1]
tmp_dirname = splitpaths[:-1]
tmp_dirname= '/'.join(tmp_dirname)
self._chdir(tmp_dirname, update_curdir=False, ignore_error=False)
file_out_adf = adflib.adfOpenFile(vol, filename, "w");
if not file_out_adf:
# error, bad directory or adf file opened in readonly mode?
self._chdir(self._curdir)
raise AdfIOException('unable to create filename %s for read' % filename)
return
tmp_buffer_size = 1024*8
mybuff_type = ctypes.c_ubyte * tmp_buffer_size
mybuff = mybuff_type() ## probably a better way than this
file_contents_len = len(file_contents)
file_contents_left = file_contents_len
eof_yet = file_contents_left <= 0
counter = 0
while not eof_yet:
tmp_counter = min(tmp_buffer_size, file_contents_left)
mybuff_counter = 0
for x in range(counter, counter + tmp_counter):
mybuff[mybuff_counter] = ord(file_contents[x])
mybuff_counter += 1 # maybe use enumerate instead?
counter += tmp_counter
file_contents_left -= tmp_counter
eof_yet = file_contents_left <= 0
# long adfWriteFile(struct File* file, long n, unsigned char* buffer)
n = adflib.adfWriteFile(file_out_adf, tmp_counter, ctypes.byref(mybuff))
#print 'write', n
adflib.adfCloseFile(file_out_adf)
def unlink(self, filename):
"""delete a file"""
## TODO not opened in write mode check?
vol = self.vol
result = adflib.adfRemoveEntry(vol, vol[0].curDirPtr, filename)
if result == -1:
# FAIL
raise AdfIOException('unlink/delete failed on %s' % (filename))
# else flush? incase of errors later on
def rename(self, old, new):
"""rename a file or directory"""
## TODO not opened in write mode check?
vol = self.vol
result = adflib.adfRenameEntry(vol, vol[0].curDirPtr, old, vol[0].curDirPtr, new)
if result == -1:
# FAIL
raise AdfIOException('rename %s to %s' % (old, new))
# else flush? incase of errors later on
def diskname(self):
"""return the volumn name
Could be a property rather than a function"""
raise NotImplementedError('Adf')
def open(self):
readonly_mode = self.readonly_mode
## not sure about name
if self.env is None:
self.env = True
_single_internaladfenv.acquire()
if not self.dev:
self.dev = adflib.adfMountDev(self.adf_filename, readonly_mode)
if not self.vol:
self.vol = adflib.adfMount(self.dev, self.volnum, readonly_mode)
def close(self):
## not sure about name
if self.vol:
adflib.adfUnMount(self.vol)
self.vol = None
if self.dev:
adflib.adfUnMountDev(self.dev)
self.dev = None
if self.env is not None:
_single_internaladfenv.release()
self.env = None
def __del__(self):
self.close()
def create_empty_adf(adf_filename, diskname='empty', cyl=80, heads=2, sectors=11):
"""Consider implementing as a Adf classmethod?
DD floppy; cyl=80, heads=2, sectors=11
HD floppies have 22 sectors
"""
#raise NotImplementedError('not completed yet, adflib appeared to be missing adfCreateDumpDevice')
if os.path.exists(adf_filename):
raise AdfIOException('%r should not exist!' % adf_filename)
adf_setup()
flop = adflib.adfCreateDumpDevice(adf_filename, cyl, heads, sectors)
if flop is None:
print 'adfCreateFlop error', rc
raise AdfIOException('%s adfCreateDumpDevice failed' % adf_filename)
# create the filesystem : OFS with DIRCACHE
# RETCODE adfCreateFlop(struct Device* dev, char* volName, int volType )
rc = adflib.adfCreateFlop(flop, diskname, adflib.FSMASK_DIRCACHE)
if rc != adflib.RC_OK:
print 'adfCreateFlop error', rc
raise AdfIOException('%s adfCreateFlop failed, %s' % (adf_filename, diskname))
adflib.adfUnMountDev(flop)
adf_cleanup()

4547
pyadf/adflib.py Normal file

File diff suppressed because it is too large Load Diff