1
0
mirror of https://github.com/clementine-player/Clementine synced 2025-01-08 08:11:36 +01:00
Clementine-audio-player-Mac.../scripts/amazon-covers/amazon_covers.py

155 lines
4.4 KiB
Python

import clementine
from PythonQt.QtCore import QUrl
from PythonQt.QtNetwork import QNetworkRequest
import base64
import hashlib
import hmac
import logging
import time
import urllib
import xml.etree.ElementTree
LOGGER = logging.getLogger("amazon_covers")
class AmazonCoverProvider(clementine.CoverProvider):
"""
Most of the Amazon API related code here comes from a plugin (which I wrote) for
an open source application called Cardapio.
"""
API_URL = 'http://ecs.amazonaws.com/onca/xml?{0}'
AWS_ACCESS_KEY = 'AKIAJ4QO3GQTSM3A43BQ'
AWS_SECRET_ACCESS_KEY = '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'
def __init__(self, parent=None):
clementine.CoverProvider.__init__(self, "Amazon", parent)
# basic API's arguments (search in all categories)
self.api_base_args = {
'Service' : 'AWSECommerceService',
'Version' : '2009-11-01',
'Operation' : 'ItemSearch',
'SearchIndex' : 'All',
'ResponseGroup' : 'Images',
'AWSAccessKeyId': self.AWS_ACCESS_KEY
}
self.network = clementine.NetworkAccessManager()
def StartSearch(self, artist, album, id):
query = self.PrepareAmazonRESTUrl(artist + " " + album)
url = QUrl.fromEncoded(self.API_URL.format(query))
LOGGER.debug("ID %d: Sending request to '%s'" % (id, url))
reply = self.network.get(QNetworkRequest(url))
def QueryFinished():
LOGGER.debug("ID %d: Finished" % id)
self.SearchFinished(id, self.ParseReply(reply))
reply.connect("finished()", QueryFinished)
return True
def ParseReply(self, reply):
parsed = []
# watch out for connection problems
try:
xml_body = str(reply.readAll())
# watch out for empty input
if len(xml_body) == 0:
return parsed
root = xml.etree.ElementTree.fromstring(xml_body)
# strip the namespaces from all of the parsed items
for el in root.getiterator():
ns_pos = el.tag.find('}')
if ns_pos != -1:
el.tag = el.tag[(ns_pos + 1):]
except Exception as ex:
LOGGER.exception(ex)
return parsed
used_urls = set()
# decode the result
try:
items = []
is_valid = root.find('Items/Request/IsValid')
total_results = root.find('Items/TotalResults')
# if we have a valid response with any results...
if is_valid is not None and is_valid != 'False' and \
total_results is not None and total_results != '0':
query = root.find('Items/Request/ItemSearchRequest/Keywords').text
# remember them all
for item in root.findall('Items/Item'):
final_url = None
current_url = item.find('LargeImage/URL')
if current_url is None:
current_url = item.find('MediumImage/URL')
if current_url is None or current_url.text in used_urls:
continue
used_urls.add(current_url.text)
current = clementine.CoverSearchResult()
current.description = str(query)
current.image_url = str(current_url.text)
parsed.append(current)
except KeyError as ex:
LOGGER.exception(ex)
return parsed
def PrepareAmazonRESTUrl(self, text):
"""
Prepares a RESTful URL according to Amazon's strict querying policies.
Deals with the variable part of the URL only (the one after the '?').
"""
# additional required API arguments
copy_args = self.api_base_args.copy()
copy_args['Keywords'] = str(text)
copy_args['Timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
# turn the argument map into a list of encoded request parameter strings
query_list = ["%s=%s" % (k, urllib.quote(v))
for k, v in copy_args.items()]
# sort the list (by parameter name)
query_list.sort()
# turn the list into a partial URL string
query_string = "&".join(query_list)
# prepare a string on which we will base the AWS signature
string_to_sign = """GET
{0}
/onca/xml
{1}""".format('ecs.amazonaws.com', query_string)
# create HMAC for the string (using SHA-256 and our secret API key)
hm = hmac.new(key = self.AWS_SECRET_ACCESS_KEY,
msg = string_to_sign,
digestmod = hashlib.sha256)
# final step... convert the HMAC to base64, then encode it
signature = urllib.quote(base64.b64encode(hm.digest()))
return query_string + '&Signature=' + signature
provider = AmazonCoverProvider()
clementine.cover_providers.AddProvider(provider)