-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzipfile.py
129 lines (108 loc) · 4.12 KB
/
zipfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import logging
import struct
from micropython import const
from binascii import crc32
from collections import OrderedDict
from zlib import decompress
# Constants
SEEK_SET = const(0)
SEEK_CUR = const(1)
SEEK_END = const(2)
ZIP_WBITS = const(-15)
COMP_NONE = const(0)
COMP_DEF = const(8)
# ZIP structures
EOCD_SIG = b'PK\x05\x06'
EOCD_STRUCT = '<4s4H2LH'
EOCD_SIZE = struct.calcsize(EOCD_STRUCT)
CD_F_H_SIG = b'PK\x01\x02'
CD_F_H_STRUCT = '<4s4B4H3L5H2L'
CD_F_H_SIZE = struct.calcsize(CD_F_H_STRUCT)
LOCAL_F_H_STRUCT = '<4s2B4HL2L2H'
LOCAL_F_H_SIZE = struct.calcsize(LOCAL_F_H_STRUCT)
class BadZipFile(Exception):
pass
class ZipInfo:
def __init__(self, cd_header_data):
self.name = '' # Overriden by ZipFile
(sig,
_, _, _, _, # Compressor and min version, we don't care
_, # General purpose bit flag?
self.compress_method,
self.last_mod_time,
self.last_mod_date,
self.crc32,
self.compressed_size,
self.size,
self.filename_len,
self.extra_field_len,
self.comment_len,
_, # Disk number, we only support single part ZIPs
_, _, # File attributes, we don't care
self.offset) = struct.unpack(CD_F_H_STRUCT, cd_header_data)
if sig != CD_F_H_SIG:
raise BadZipFile(
"Central directory entry signature mismatch, ZIP corrupt?")
@property
def compressed(self):
return self.compress_method != COMP_NONE
def __str__(self):
return "<{} name={}, compressed={}, size={}, offset={}>".format(
self.__class__.__name__,
self.name,
self.compressed,
self.size,
self.offset)
class ZipFile:
def __init__(self, file_obj):
self.file_obj = file_obj
file_obj.seek(-EOCD_SIZE, SEEK_END)
(magic_number,
num_disks,
_, _, # Per disk stuff, we don't care
central_dir_count,
central_dir_size,
central_dir_offset,
comment_len) = struct.unpack(EOCD_STRUCT, file_obj.read(EOCD_SIZE))
if magic_number != EOCD_SIG:
raise BadZipFile(
"EOCD contains comment or ZIP corrupt?")
if num_disks:
raise BadZipFile(
"Multipart/disk ZIPs not supported")
logging.debug("Central dir contains %s entries", central_dir_count)
self.entries = OrderedDict()
file_obj.seek(central_dir_offset)
for i in range(central_dir_count):
logging.debug("Reading CD_F_H %s", i)
zi = ZipInfo(file_obj.read(CD_F_H_SIZE))
zi.name = file_obj.read(zi.filename_len).decode()
self.entries[zi.name] = zi
# Skip to next entry
file_obj.seek(zi.extra_field_len + zi.comment_len, SEEK_CUR)
def __iter__(self):
yield from self.entries
def __getitem__(self, k):
return self.entries[k]
def read(self, member):
zip_info = member if isinstance(member, ZipInfo) else self[member]
# Seek to data, skip local file header
self.file_obj.seek(zip_info.offset + LOCAL_F_H_SIZE
+ zip_info.filename_len + zip_info.extra_field_len)
# Read actual data, perform decompression if needed
comp_data = self.file_obj.read(zip_info.compressed_size)
if zip_info.compress_method == COMP_DEF:
# Decompress, not DecompIO because of very bad performance
uncomp_data = decompress(comp_data, -15)
elif zip_info.compress_method == COMP_NONE:
uncomp_data = comp_data # Data was just stored, not compressed
else:
raise BadZipFile("Unsupported compression method"
"for file {}".format(zip_info.name))
# Validate CRC32
if crc32(uncomp_data) != zip_info.crc32:
raise BadZipFile("Bad CRC32 for file {}".format(zip_info.name))
return uncomp_data