Added some better content type fixes, a better save, and delete fails... (#19)
* Added some better content type fixes, a better save, and delete fails silently * better unicode method * fix comment, and extract test * only stick to mimetype guess * delete QS unconditionally
This commit is contained in:
parent
9590342799
commit
1014315f9c
@ -14,4 +14,12 @@ class DBFile(models.Model):
|
|||||||
b64 = models.TextField()
|
b64 = models.TextField()
|
||||||
|
|
||||||
def __unicode__(self):
|
def __unicode__(self):
|
||||||
return unicode(self.name)
|
return u"{name} <{content_type}>".format(
|
||||||
|
name=self.name, content_type=self.content_type)
|
||||||
|
|
||||||
|
def save(self, **kwargs):
|
||||||
|
if self.content_type is None:
|
||||||
|
# If content type guessing fails,
|
||||||
|
# use octet stream as a major fallback
|
||||||
|
self.content_type = "application/octet-stream"
|
||||||
|
super(DBFile, self).save(**kwargs)
|
||||||
|
@ -40,26 +40,14 @@ class DBFileStorage(Storage):
|
|||||||
|
|
||||||
:return str: the name(md5) to look up the file by.
|
:return str: the name(md5) to look up the file by.
|
||||||
"""
|
"""
|
||||||
ct = None
|
|
||||||
if hasattr(content.file, "read"):
|
if hasattr(content.file, "read"):
|
||||||
read_data = content.file.read()
|
read_data = content.file.read()
|
||||||
else:
|
else:
|
||||||
read_data = content.file.encode('utf8')
|
read_data = content.file.encode('utf8')
|
||||||
b64 = read_data.encode('base64')
|
b64 = read_data.encode('base64')
|
||||||
|
|
||||||
# Try to get the real content_type if applicable.
|
# USE mimetypes.guess_type as an attempt at getting the content type.
|
||||||
try:
|
ct = mimetypes.guess_type(name)[0]
|
||||||
if hasattr(content, 'content_type'):
|
|
||||||
ct = content.content_type
|
|
||||||
elif hasattr(content.file, 'content_type'):
|
|
||||||
ct = content.file.content_type
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# USE mimetypes.guess_type as a fallback.
|
|
||||||
if ct is None:
|
|
||||||
# https://docs.python.org/2/library/mimetypes.html
|
|
||||||
ct = mimetypes.guess_type(name)[0]
|
|
||||||
|
|
||||||
# After we get the mimetype by name potentially, mangle it.
|
# After we get the mimetype by name potentially, mangle it.
|
||||||
name = hashlib.md5(read_data).hexdigest()
|
name = hashlib.md5(read_data).hexdigest()
|
||||||
@ -81,7 +69,9 @@ class DBFileStorage(Storage):
|
|||||||
|
|
||||||
def delete(self, name):
|
def delete(self, name):
|
||||||
assert name, "The name argument is not allowed to be empty."
|
assert name, "The name argument is not allowed to be empty."
|
||||||
DBFile.objects.get(pk=name).delete()
|
# name is the Pk, so it will be unique, deleting on the QS so
|
||||||
|
# that it fails silently if the file doesn't exist.
|
||||||
|
DBFile.objects.filter(pk=name).delete()
|
||||||
|
|
||||||
def exists(self, name):
|
def exists(self, name):
|
||||||
return DBFile.objects.filter(pk=name).exists()
|
return DBFile.objects.filter(pk=name).exists()
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from dbfilestorage.models import DBFile
|
from dbfilestorage.models import DBFile
|
||||||
|
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
|
from django.core.files.base import ContentFile
|
||||||
from django.core.files.storage import default_storage
|
from django.core.files.storage import default_storage
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
from django.test import TestCase, Client
|
from django.test import TestCase, Client
|
||||||
@ -30,6 +32,15 @@ class DBFileTest(TestCase):
|
|||||||
""" Test that the file storage uploads and puts in DB Properly """
|
""" Test that the file storage uploads and puts in DB Properly """
|
||||||
self.assertTrue(DBFile.objects.filter(name=self.md5).exists())
|
self.assertTrue(DBFile.objects.filter(name=self.md5).exists())
|
||||||
|
|
||||||
|
def test_content_file(self):
|
||||||
|
""" Test that this code works with ContentFile as well """
|
||||||
|
content_file = ContentFile(u"ΑΔΔGΕΝΕ")
|
||||||
|
content_file_md5 = hashlib.md5(u"ΑΔΔGΕΝΕ".encode('utf8')).hexdigest()
|
||||||
|
default_storage.save("unicode", content_file)
|
||||||
|
unicode_file = DBFile.objects.get(name=content_file_md5)
|
||||||
|
self.assertEqual(unicode(unicode_file),
|
||||||
|
"{} <application/octet-stream>".format(content_file_md5))
|
||||||
|
|
||||||
def test_no_duplicate_upload(self):
|
def test_no_duplicate_upload(self):
|
||||||
""" Test that it won't make a new file if it already exists """
|
""" Test that it won't make a new file if it already exists """
|
||||||
# uploads once in setup already
|
# uploads once in setup already
|
||||||
@ -40,8 +51,7 @@ class DBFileTest(TestCase):
|
|||||||
""" Test that the DB entry matches what is expected from the file """
|
""" Test that the DB entry matches what is expected from the file """
|
||||||
with open(self.filepath, 'rb') as f:
|
with open(self.filepath, 'rb') as f:
|
||||||
dbf = DBFile.objects.get(name=self.md5)
|
dbf = DBFile.objects.get(name=self.md5)
|
||||||
self.assertEqual(dbf.b64.decode("base64"),
|
self.assertEqual(dbf.b64.decode("base64"), f.read())
|
||||||
f.read())
|
|
||||||
self.assertEqual(dbf.content_type, 'image/jpeg')
|
self.assertEqual(dbf.content_type, 'image/jpeg')
|
||||||
|
|
||||||
def test_open(self):
|
def test_open(self):
|
||||||
@ -59,6 +69,10 @@ class DBFileTest(TestCase):
|
|||||||
self.assertTrue(DBFile.objects.filter(name=self.md5).exists())
|
self.assertTrue(DBFile.objects.filter(name=self.md5).exists())
|
||||||
default_storage.delete(self.md5)
|
default_storage.delete(self.md5)
|
||||||
self.assertFalse(DBFile.objects.filter(name=self.md5).exists())
|
self.assertFalse(DBFile.objects.filter(name=self.md5).exists())
|
||||||
|
# Also test that calling delete on something that doesn't exist,
|
||||||
|
# errors silently
|
||||||
|
self.assertFalse(DBFile.objects.filter(name="Nothing").exists())
|
||||||
|
default_storage.delete("Nothing")
|
||||||
|
|
||||||
def test_path(self):
|
def test_path(self):
|
||||||
""" Test the path is just the md5 name """
|
""" Test the path is just the md5 name """
|
||||||
|
Loading…
Reference in New Issue
Block a user