# 原始代码作者:Ephylm411
# 参考:https://zhuanlan.zhihu.com/p/623697843
# 改进:Kinotern 与 GPT-5.3 Codex
import os, sys
import struct
import zlib
warnings = []
def warn(value):
warnings.append(value)
def clearWarnings():
global warnings
warnings = []
def printWarnings(sc):
global warnings
for wi in warnings:
print('[WARNING]', sc, wi)
clearWarnings()
def cst2bin(datcst):
# CST 外层封装:魔数 + 压缩后大小 + 解压后大小 + zlib 数据。
tag, sizcst, sizbin = struct.unpack_from('8sII', datcst)
if tag != b'CatScene':
raise Exception('Label Mismatch')
datcst = datcst[16:]
if sizcst != len(datcst):
raise Exception('Size Ante Decompress Mismatch')
datbin = zlib.decompress(datcst)
if sizbin != len(datbin):
raise Exception('Size Post Decompress Mismatch')
return datbin
def bin2cst(datbin):
# 将场景原始二进制重新封装为 CST。
datcst = zlib.compress(datbin)
return b'CatScene' + struct.pack('II', len(datcst), len(datbin)) + datcst
class FormatCST:
def __init__(self, fc):
# 解码并拆分 CST 内部数据:
# - b1:分段映射表
# - b2:片段偏移表
# - b3:片段内容区
b = cst2bin(fc.read())
(h0, self.h1, self.h2, self.h3), b = struct.unpack_from('4I', b), b[16:]
self.b1, b2, b3 = b[:self.h2], b[self.h2:self.h3], b[self.h3:]
if h0 != len(b) or self.h1 * 8 != self.h2 or (self.h3 - self.h2) % 4 != 0:
raise Exception('Integrity Constraint 0 Violated')
it = struct.iter_unpack('II', self.b1)
flag = True
self.n1 = 0
while flag:
try:
d10, d11 = next(it)
if d11 != self.n1:
flag = False
self.n1 += d10
except StopIteration:
break
if not flag or self.n1 * 4 != self.h3 - self.h2:
raise Exception('Integrity Constraint 1 Violated')
it = struct.iter_unpack('I', b2)
d2 = []
while True:
try:
d2.append(*next(it))
except StopIteration:
break
if self.n1 != len(d2):
raise Exception('Integrity Constraint 2 Violated')
ofs = 0
self.d3 = []
for i in range(self.n1):
if ofs < d2[i]:
warn('Unaccessible Fragment Offset 0x{0:08X}'.format(ofs))
ofs = d2[i]
if ofs > d2[i]:
raise Exception('Overflow Offset 0x{0:08X}'.format(ofs))
try:
d30, d31, d32 = struct.unpack_from('3B', b3, ofs)
except Exception:
raise Exception('Content Truncated')
if d30 != 0x01:
raise Exception('Invalid Offset 0x{0:08X}'.format(ofs))
if d31 not in (0x02, 0x20, 0x21, 0x30):
warn('Unknown Code 0x01{1:02X} Offset 0x{0:08X}'.format(ofs, d31))
ofs += 3
while d32 != 0x00:
try:
d32, = struct.unpack_from('B', b3, ofs)
except Exception:
raise Exception('Content Truncated')
ofs += 1
self.d3.append(b3[d2[i] : ofs])
if ofs < len(b3):
warn('Unaccessible Fragment Offset 0x{0:08X}'.format(ofs))
def iter(self):
# 重置迭代状态。
self.idx = -1
self.fslc = False
def next(self, skp = True):
# 移动到下一个片段。skp=True 时只保留可见文本项。
self.idx += 1
if skp:
while self.idx < self.n1:
if self.fslc:
break
d31, = struct.unpack_from('B', self.d3[self.idx], 1)
if d31 in (0x20, 0x21):
break
if d31 == 0x30 and self.d3[self.idx][2:8] == b'scene\x20':
break
if d31 == 0x30 and self.d3[self.idx][2:] == b'fselect\x00':
self.fslc = True
self.idx += 1
if self.idx >= self.n1:
raise StopIteration
def get(self, skp = True):
# 返回当前片段的文本主体。
if skp:
return self.d3[self.idx][2:-1]
else:
return b'<\\x01><\\x' + bytes('{0:02X}'.format(self.d3[self.idx][1]), encoding = 'utf-8') + b'>' + self.d3[self.idx][2:-1] + b'<\\x00>'
def rep(self, bn):
# 替换当前片段文本,保留前缀与结尾空字节。
self.d3[self.idx] = self.d3[self.idx][:2] + bn + b'\x00'
def pac(self):
# 重新构建偏移表和内容区。
b2, b3 = b'', b''
ofs = 0
for i in range(self.n1):
b2 += struct.pack('I', ofs)
b3 += self.d3[i]
ofs += len(self.d3[i])
b0 = struct.pack('4I', self.h3 + ofs, self.h1, self.h2, self.h3)
return b0 + self.b1 + b2 + b3
pathcst = 'scene_cst'
pathbin = 'scene_bin'
pathtxt = 'scene_txt'
pathdst = 'scene_dst'
# 游戏脚本文本编码(NEKOPARA 常见为 CP932 / Shift-JIS)
SCENE_ENCODING = 'cp932'
# 导出文本编码:UTF-8 with BOM(utf-8-sig)
TXT_ENCODING = 'utf-8-sig'
def scene_bytes_to_text(bn):
return bn.decode(SCENE_ENCODING)
def text_to_scene_bytes(st):
return st.encode(SCENE_ENCODING)
def cst_to_txt_name(sc_name):
# 统一文本命名:01.cst -> 01.cst.txt
return sc_name + '.txt'
def read_txt_lines(st):
# 优先按 UTF-8 BOM 读取;若是旧版 CP932 文本则回退兼容。
try:
f = open(st, 'r', encoding = TXT_ENCODING, newline = '')
try:
return f.read().splitlines()
finally:
f.close()
except UnicodeDecodeError:
warn('TXT 非 UTF-8,已回退按 CP932 读取')
f = open(st, 'r', encoding = SCENE_ENCODING, newline = '')
try:
return f.read().splitlines()
finally:
f.close()
def depacst_file(sc, sb = None):
# 单文件模式:.cst -> .bin
clearWarnings()
if sb is None:
sb = os.path.splitext(sc)[0] + '.bin'
f = open(sc, 'rb')
try:
b = cst2bin(f.read())
finally:
f.close()
f = open(sb, 'wb')
f.write(b)
f.close()
printWarnings(os.path.basename(sc))
return sb
def unpacst_file(sc, st = None, skp = True):
# 单文件模式:.cst -> .txt
clearWarnings()
if st is None:
st = cst_to_txt_name(sc)
f = open(sc, 'rb')
try:
c = FormatCST(f)
finally:
f.close()
f = open(st, 'w', encoding = TXT_ENCODING, newline = '\r\n')
c.iter()
while True:
try:
c.next(skp)
f.write(scene_bytes_to_text(c.get(skp)))
f.write('\n')
except StopIteration:
break
f.close()
printWarnings(os.path.basename(sc))
return st
def repacst_file(sc, st = None, sd = None):
# 单文件模式:.cst + .txt -> .new.cst
clearWarnings()
if st is None:
st = cst_to_txt_name(sc)
# 兼容旧命名:01.txt
if not os.path.exists(st):
st_old = os.path.splitext(sc)[0] + '.txt'
if os.path.exists(st_old):
warn('检测到旧命名 TXT,建议改为 *.cst.txt')
st = st_old
if sd is None:
sd = os.path.splitext(sc)[0] + '.new.cst'
f = open(sc, 'rb')
try:
c = FormatCST(f)
finally:
f.close()
lines = read_txt_lines(st)
c.iter()
li = 0
while True:
try:
c.next()
except StopIteration:
break
if li >= len(lines):
warn('Lack of Text')
break
try:
bn = text_to_scene_bytes(lines[li])
except UnicodeEncodeError as e:
raise Exception('Text Encode Error Line {0}: {1}'.format(li + 1, e))
c.rep(bn)
li += 1
if li < len(lines):
warn('Unused Text Lines: {0}'.format(len(lines) - li))
f = open(sd, 'wb')
f.write(bin2cst(c.pac()))
f.close()
printWarnings(os.path.basename(sc))
return sd
def parse_arg():
# 命令行用法:
# - python 1.py
# - python 1.py 0|1|2|3
# - python 1.py xxx.cst
# - python 1.py 0|1|2|3 xxx.cst
# - python 1.py folder
# - python 1.py 0|2|3 folder
tag = 0
src = None
if len(sys.argv) >= 2:
if sys.argv[1] in ('0', '1', '2', '3'):
tag = int(sys.argv[1])
if len(sys.argv) >= 3:
src = sys.argv[2]
else:
src = sys.argv[1]
return tag, src
def resolve_src(src):
# 解析输入路径:支持直接路径或 pathcst 下的相对路径。
if src is None:
return (None, None)
if os.path.isfile(src):
return ('file', src)
if os.path.isdir(src):
return ('dir', src)
cst = os.path.join(pathcst, src)
if os.path.isfile(cst):
return ('file', cst)
cst_dir = os.path.join(pathcst, src)
if os.path.isdir(cst_dir):
return ('dir', cst_dir)
return (None, None)
def extract_dir(src_dir, skp = True):
# 批量提取目录中的 .cst 到 <目录>/scene_txt。
lis = os.listdir(src_dir)
csts = [name for name in lis if name.endswith('.cst')]
dst_dir = os.path.join(src_dir, 'scene_txt')
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
s0, s1 = 0, len(csts)
for sc in csts:
scp = os.path.join(src_dir, sc)
stp = os.path.join(dst_dir, cst_to_txt_name(sc))
try:
unpacst_file(scp, stp, skp)
s0 += 1
except Exception as e:
print('[ERROR]', sc, e)
return (s0, s1)
def depacst_dir(src_dir):
# 批量解压目录中的 .cst 到 <目录>/scene_bin。
lis = os.listdir(src_dir)
csts = [name for name in lis if name.endswith('.cst')]
dst_dir = os.path.join(src_dir, 'scene_bin')
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
s0, s1 = 0, len(csts)
for sc in csts:
scp = os.path.join(src_dir, sc)
sbp = os.path.join(dst_dir, os.path.splitext(sc)[0] + '.bin')
try:
depacst_file(scp, sbp)
s0 += 1
except Exception as e:
print('[ERROR]', sc, e)
return (s0, s1)
def depacst():
liscst = os.listdir(pathcst)
if not os.path.exists(pathbin):
os.makedirs(pathbin)
s0, s1 = 0, 0
for sc in liscst:
if not sc.endswith('.cst'):
continue
s1 += 1
sp = os.path.join(pathcst, sc)
sb = os.path.join(pathbin, sc[:-3] + 'bin')
try:
depacst_file(sp, sb)
except Exception as e:
print('[ERROR]', sc, e)
continue
s0 += 1
return (s0, s1)
def unpacst(skp = True):
liscst = os.listdir(pathcst)
if not os.path.exists(pathtxt):
os.makedirs(pathtxt)
s0, s1 = 0, 0
for sc in liscst:
if not sc.endswith('.cst'):
continue
s1 += 1
sp = os.path.join(pathcst, sc)
st = os.path.join(pathtxt, cst_to_txt_name(sc))
try:
unpacst_file(sp, st, skp)
except Exception as e:
print('[ERROR]', sc, e)
continue
s0 += 1
return (s0, s1)
def repacst():
liscst = os.listdir(pathcst)
listxt = os.listdir(pathtxt)
if not os.path.exists(pathdst):
os.makedirs(pathdst)
s0, s1 = 0, 0
for st in listxt:
if not st.endswith('.txt'):
continue
if st.endswith('.cst.txt'):
sc = st[:-4]
else:
# 兼容旧命名:01.txt -> 01.cst
sc = st[:-4] + '.cst'
if sc not in liscst:
print('[WARNING] Original CST File Missing: ' + sc)
continue
s1 += 1
sp = os.path.join(pathcst, sc)
stp = os.path.join(pathtxt, st)
sdp = os.path.join(pathdst, sc)
try:
repacst_file(sp, stp, sdp)
except Exception as e:
print('[ERROR]', sc, e)
continue
s0 += 1
return (s0, s1)
if __name__ == '__main__':
tag, src = parse_arg()
if tag not in (0, 1, 2, 3):
print('Invalid Parametre')
sys.exit()
srct, src = resolve_src(src)
if src is not None and srct == 'file':
try:
if tag == 0:
unpacst_file(src)
if tag == 1:
repacst_file(src)
if tag == 2:
depacst_file(src)
if tag == 3:
unpacst_file(src, skp = False)
print('1 / 1 completed')
except Exception as e:
print('[ERROR]', os.path.basename(src), e)
sys.exit(1)
sys.exit()
if src is not None and srct == 'dir':
if tag == 0:
s0, s1 = extract_dir(src)
elif tag == 2:
s0, s1 = depacst_dir(src)
elif tag == 3:
s0, s1 = extract_dir(src, False)
else:
print('[ERROR] Directory mode only supports tag 0/2/3')
sys.exit(1)
print('%d / %d completed' % (s0, s1))
sys.exit()
if len(sys.argv) >= 2 and sys.argv[1] not in ('0', '1', '2', '3'):
print('[ERROR] CST Path Missing:', sys.argv[1])
sys.exit(1)
if tag == 0:
s0, s1 = unpacst()
if tag == 1:
s0, s1 = repacst()
if tag == 2:
s0, s1 = depacst()
if tag == 3:
s0, s1 = unpacst(False)
print('%d / %d completed' % (s0, s1))