add avif compression script
Before Width: | Height: | Size: 151 KiB After Width: | Height: | Size: 5.9 KiB |
Before Width: | Height: | Size: 85 KiB After Width: | Height: | Size: 10 KiB |
Before Width: | Height: | Size: 80 KiB After Width: | Height: | Size: 10 KiB |
Before Width: | Height: | Size: 464 KiB After Width: | Height: | Size: 37 KiB |
63
scripts/imagecompressv2.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
# requires imagemagick, avifenc, svt-av1
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import concurrent.futures
|
||||
|
||||
# below this threshold, it won't be compressed with avif
|
||||
MIN_FILE_SIZE = 30000
|
||||
|
||||
def detect_compressible(fpath):
|
||||
mimetype = subprocess.check_output(["file", "--mime-type", "-b", fpath]).strip().decode('utf-8')
|
||||
|
||||
if mimetype in ('image/png', 'image/jpeg'):
|
||||
return True
|
||||
|
||||
if mimetype == 'image/webp':
|
||||
fileout = subprocess.check_output(["file", fpath]).decode('utf-8')
|
||||
if 'lossless' in fileout:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def compress(fpath):
|
||||
rtpath = '/tmp/'+os.urandom(4).hex()+'.png'
|
||||
subprocess.run(["magick", fpath, rtpath])
|
||||
subprocess.run(["avifenc", rtpath, "--yuv", "420", "--range", "l", "-q", "50", "-c", "svt", "-j", "1", "--speed", "0", "--ignore-exif", "-o", fpath])
|
||||
try:
|
||||
os.remove(rtpath)
|
||||
except:
|
||||
pass
|
||||
|
||||
def process_files_in_directory(directory):
|
||||
compressible_files = []
|
||||
|
||||
for root, _, files in os.walk(directory):
|
||||
for fi in files:
|
||||
full_path = os.path.join(root, fi)
|
||||
print(f"Checking file: {full_path}")
|
||||
if os.path.getsize(full_path) < MIN_FILE_SIZE:
|
||||
continue
|
||||
if detect_compressible(full_path):
|
||||
compressible_files.append(full_path)
|
||||
|
||||
return compressible_files
|
||||
|
||||
def main(directory, max_workers):
|
||||
compressible_files = process_files_in_directory(directory)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(compress, fpath): fpath for fpath in compressible_files}
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
fpath = futures[future]
|
||||
try:
|
||||
future.result()
|
||||
print(f"Successfully compressed: {fpath}")
|
||||
except Exception as exc:
|
||||
print(f"Compression failed for {fpath}: {exc}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main('../docs/', max_workers=32)
|