#-*- coding:utf-8 -*-
"""
Originally (c) Sebastiaan Mathot 2011
Modifications (c) 2014, 2018 Martin Paul Eve
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sqlite3
import os
import os.path
import sys
import shutil
import sys
import time
from zotero_item import zoteroItem as zotero_item
from debug import Debuggable
[docs]class LibZotero(Debuggable):
"""
Libzotero provides access to the zotero database.
This is an object oriented reimplementation of the
original zoterotools.
"""
[docs] attachment_query = u"""
select items.itemID, itemAttachments.path, itemAttachments.itemID
from items, itemAttachments
where items.itemID = itemAttachments.sourceItemID
"""
[docs] info_query = u"""
select items.itemID, fields.fieldName, itemDataValues.value, items.key, itemTypes.typeName
from items, itemData, fields, itemDataValues, itemTypes
where
items.itemID = itemData.itemID
and itemData.fieldID = fields.fieldID
and itemData.valueID = itemDataValues.valueID
and itemTypes.itemTypeID = items.itemTypeID
"""
[docs] collection_query = u"""
select items.itemID, collections.collectionName
from items, collections, collectionItems
where
items.itemID = collectionItems.itemID
and collections.collectionID = collectionItems.collectionID
order by collections.collectionName != "To Read",
collections.collectionName
"""
[docs] tag_query = u"""
select items.itemID, tags.name
from items, tags, itemTags
where
items.itemID = itemTags.itemID
and tags.tagID = itemTags.tagID
"""
[docs] deleted_query = u"select itemID from deletedItems"
@staticmethod
[docs] def creator_query(creator_type):
return u"""
select items.itemID, creatorData.lastName, creatorData.firstName
from items, itemCreators, creators, creatorData, creatorTypes
where
items.itemID = itemCreators.itemID
and itemCreators.creatorID = creators.creatorID
and creators.creatorDataID = creatorData.creatorDataID
and itemCreators.creatorTypeID = creatorTypes.creatorTypeID
and creatorTypes.creatorType == "{0}"
order by itemCreators.orderIndex
""".format(creator_type)
def __init__(self, zotero_path, global_variables, noteProvider=None):
Debuggable.__init__(self, 'libZotero')
self.gv = global_variables
[docs] self.debug = self.gv.debug
"""
Intialize zotero.
Arguments:
zotero_path -- A unicode string to the Zotero folder.
Keyword arguments:
noteProvider -- A noteProvider object. (default=None)
"""
self.debug.print_debug(self, u"zotero.__init__(): zotero_path = %s" % zotero_path)
# Set paths
self.zotero_path = zotero_path
self.storage_path = os.path.join(self.zotero_path, u"storage")
self.zotero_database = os.path.join(self.zotero_path, u"zotero.sqlite")
self.noteProvider = noteProvider
if os.name == u"nt":
home_folder = os.environ[u"USERPROFILE"].decode( \
sys.getfilesystemencoding())
elif os.name == u"posix":
home_folder = os.environ[u"HOME"].decode( \
sys.getfilesystemencoding())
else:
self.debug.print_debug(self, u"zotero.__init__(): you appear to be running an unsupported OS")
self.gnotero_database = os.path.join(home_folder, u".gnotero.sqlite")
# Remember search results so results speed up over time
self.search_cache = {}
# Check whether verbosity is turned on
self.verbose = "-v" in sys.argv
# These dates are treated as special and are not parsed into a year
# representation
self.special_dates = u"in press", u"submitted", u"in preparation", \
u"unpublished"
# These extensions are recognized as fulltext attachments
self.attachment_ext = u".pdf", u".epub"
self.index = {}
self.collection_index = []
self.tag_index = []
self.last_update = None
# The notry parameter can be used to show errors which would
# otherwise be obscured by the try clause
if "--notry" in sys.argv:
self.search(u"dummy")
# Start by updating the database
try:
self.search(u"dummy")
self.error = False
except Exception as e:
self.debug.print_debug(self, e)
self.error = True
[docs] def update(self, force=False):
"""
Checks if the local copy of the zotero database is up to date. If not,
the data is also indexed.
Arguments:
force -- Indicates that the data should also be indexed, even
if the local copy is up to date. (default=False)
"""
try:
stats = os.stat(self.zotero_database)
except Exception as e:
self.debug.print_debug(self, u"zotero.update(): %s" % e)
return False
# Only update if necessary
if not force and stats[8] > self.last_update:
t = time.time()
self.last_update = stats[8]
self.index = {}
self.collection_index = []
self.search_cache = {}
# Copy the zotero database to the gnotero copy
shutil.copyfile(self.zotero_database, self.gnotero_database)
self.conn = sqlite3.connect(self.gnotero_database)
self.cur = self.conn.cursor()
# First create a list of deleted items, so we can ignore those later
deleted = []
self.cur.execute(self.deleted_query)
for item in self.cur.fetchall():
deleted.append(item[0])
# Retrieve information about date, publication, volume, issue and
# title
self.cur.execute(self.info_query)
for item in self.cur.fetchall():
item_id = item[0]
key = item[3]
if item_id not in deleted:
item_name = item[1]
# Parse date fields, because we only want a year or a #
# 'special' date
if item_name == u"date":
item_value = None
for sd in self.special_dates:
if sd in item[2].lower():
item_value = sd
break
# Dates can have months, days, and years, or just a
# year, and can be split by '-' and '/' characters.
if item_value is None:
# Detect whether the date should be split
if u'/' in item[2]:
split = u'/'
elif u'-' in item[3]:
split = u'-'
else:
split = None
# If not, just use the last four characters
if split is None:
item_value = item[2][-4:]
# Else take the first slice that is four characters
else:
l = item[2].split(split)
for i in l:
if len(i) == 4:
item_value = i
break
else:
item_value = item[2]
if item_id not in self.index:
self.index[item_id] = zotero_item(item_id, noteProvider=self.noteProvider)
self.index[item_id].key = key
self.index[item_id].item_type = item[4]
if item_name == u"publicationTitle" or item_name == u'bookTitle' or item_name == 'websiteTitle':
self.index[item_id].publication = item_value
elif item_name == u"date":
self.index[item_id].date = item_value
elif item_name == u"volume":
self.index[item_id].volume = item_value
elif item_name == u"issue":
self.index[item_id].issue = item_value
elif item_name == u"title":
self.index[item_id].title = item_value
elif item_name == u"DOI":
self.index[item_id].doi = item_value
elif item_name == u"pages":
self.index[item_id].pages = item_value
elif item_name == u"place":
self.index[item_id].place = item_value
elif item_name == u"publisher":
self.index[item_id].publisher = item_value
elif item_name == u"url":
self.index[item_id].url = item_value
else:
self.debug.print_debug(self, u'Unindexed field: {0}'.format(item_name))
# Retrieve author information
self.cur.execute(self.creator_query('author'))
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
# slice tuple as first column is an integer index
# next two columns represent lastname and firstname
new_authors = item[1:]
self.index[item_id].authors.append(new_authors)
# Retrieve editor information
self.cur.execute(self.creator_query('editor'))
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
# slice tuple as first column is an integer index
# next two columns represent lastname and firstname
new_authors = item[1:]
self.index[item_id].editors.append(new_authors)
# Retrieve translator information
self.cur.execute(self.creator_query('translator'))
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
# slice tuple as first column is an integer index
# next two columns represent lastname and firstname
new_authors = item[1:]
self.index[item_id].translators.append(new_authors)
# Retrieve translator information
self.cur.execute(self.creator_query('bookAuthor'))
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
# slice tuple as first column is an integer index
# next two columns represent lastname and firstname
new_authors = item[1:]
self.index[item_id].book_authors.append(new_authors)
# Retrieve collection information
self.cur.execute(self.collection_query)
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
item_collection = item[1]
if item_id not in self.index:
self.index[item_id] = zotero_item(item_id)
self.index[item_id].collections.append(item_collection)
if item_collection not in self.collection_index:
self.collection_index.append(item_collection)
# Retrieve tag information
self.cur.execute(self.tag_query)
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
item_tag = item[1]
if item_id not in self.index:
self.index[item_id] = zotero_item(item_id)
self.index[item_id].tags.append(item_tag)
if item_tag not in self.tag_index:
self.tag_index.append(item_tag)
# Retrieve attachments
self.cur.execute(self.attachment_query)
for item in self.cur.fetchall():
item_id = item[0]
if item_id not in deleted:
if item[1] != None:
att = item[1]
# If the attachment is stored in the Zotero folder, it is preceded
# by "storage:"
if att[:8] == u"storage:":
item_attachment = att[8:]
attachment_id = item[2]
if item_attachment[-4:].lower() in \
self.attachment_ext:
if item_id not in self.index:
self.index[item_id] = zotero_item(item_id)
self.cur.execute( \
u"select items.key from items where itemID = %d" \
% attachment_id)
key = self.cur.fetchone()[0]
self.index[item_id].fulltext = os.path.join( \
self.storage_path, key, item_attachment)
# If the attachment is linked, it is simply the full
# path to the attachment
else:
self.index[item_id].fulltext = att
self.cur.close()
self.debug.print_debug(self, u"zotero.update(): indexing completed in %.3fs" % (time.time() - t))
return True
[docs] def parse_query(self, query):
"""
Parses a text search query into a list of tuples, which are acceptable
for zotero_item.match().
Argument:
query -- A search query.
Returns:
A list of tuples.
"""
# Make sure that spaces are handled correctly after
# semicolons. E.g., Author: Mathot
while u": " in query:
query = query.replace(u": ", u":")
# Parse the terms into a suitable format
terms = []
# Check if the criterium is type-specified, like "author: doe"
import shlex
for term in query.strip().lower().split():
s = term.split(u":")
if len(s) == 2:
terms.append((s[0].strip(), s[1].strip()))
else:
terms.append((None, term.strip()))
return terms
[docs] def search(self, query):
"""
Searches the zotero database.
Argument:
query -- A search query.
Returns:
A list of zotero_items.
"""
if not self.update():
return []
if query in self.search_cache:
#self.debug.print_debug(self, u"zotero.search(): retrieving results for '%s' from cache" % query)
return self.search_cache[query]
t = time.time()
terms = self.parse_query(query)
results = []
for item_id, item in self.index.items():
if item.match(terms):
results.append(item)
self.search_cache[query] = results
#self.debug.print_debug(self, u"zotero.search(): search for '%s' completed in %.3fs" % (query, time.time() - t))
return results
[docs]def valid_location(path):
"""
Checks if a given path is a valid Zotero folder, i.e., if it it contains
zotero.sqlite.
Arguments:
path -- The path to check.
Returns:
True if path is a valid Zotero folder, False otherwise.
"""
return os.path.exists(os.path.join(path, u"zotero.sqlite"))