Compare commits

...

25 Commits

Author SHA1 Message Date
github-actions 4a8d1906f9 Release (nightly)
Created by:

:ci skip all
2024-06-12 01:48:25 +00:00
Paper 3e38ea12db [VidLii] Add 720p support (#30924)
* [VidLii] Add HD support  (yt-dlp backport-ish)

* Also fix a bug with the view count

---------

Co-authored-by: dirkf <fieldhouse@gmx.net>
2024-06-12 01:47:57 +00:00
dirkf 3db3ffc859 [ORF] Skip tests with limited availability 2024-06-12 01:47:57 +00:00
dirkf 7b510e4f4d [ORF] Re-factor and update`ORFFM4StoryIE`
* fix getting media via DASH instead of inaccessible mp4
* also get in-page YT media
2024-06-12 01:47:57 +00:00
dirkf ce7f1813dd [ORF] Support sound.orf.at, updating `ORFRadioIE`
* maintain support for xx.orf.at/player/... URLs
* add `ORFRadioCollectionIE` to support playlists in ORF Sound
* back-port and re-work `ORFPodcastIE` from https://github.com/yt-dlp/yt-dlp/pull/8486, thx Esokrates
2024-06-12 01:47:57 +00:00
dirkf ac87af192d [ORF] Support on.orf.at, replacing `ORFTVthekIE`
* add `ORFONIE`, back-porting yt-dlp PR https://github.com/yt-dlp/yt-dlp/pull/9113 and friends: thx HobbyistDev, TuxCoder, seproDev
* re-factor to support livestreams via new `ORFONliveIE`
2024-06-12 01:47:57 +00:00
dirkf b5fb97a711 [test] Improve download test
* skip reason can't be unicode in Py2
* remove duplicate assert...Equal functions
2024-06-12 01:47:57 +00:00
dirkf bfe962ff50 [core] Re-factor with `_fill_common_fields()` as used in yt-dlp 2024-06-12 01:47:57 +00:00
dirkf c3928f9122 [core] Safer handling of nested playlist data 2024-06-12 01:47:57 +00:00
kmnx 5a52f5f079 [mixcloud] updated mixcloud API server address (#32557)
* updated mixcloud API server address
* fix tests
* etc

---------

Co-authored-by: dirkf <fieldhouse@gmx.net>
2024-06-12 01:47:57 +00:00
github-actions 4346d68237 Release (nightly)
Created by:

:ci skip all
2024-05-31 01:46:23 +00:00
dirkf 10bf86fd2e [InfoExtractor] Misc yt-dlp back-ports, etc
* add _yes_playlist() method
* avoid crash using _NETRC_MACHINE
* use _search_json() in _search_nextjs_data()
* _search_nextjs_data() default is JSON, not text
* test for above
2024-05-31 01:45:57 +00:00
dirkf 045c63f951 [compat] Avoid type comparison in `compat_ord`
NB This isn't actually a compat fn; it should be utils.int_from_int_or_char
2024-05-31 01:45:57 +00:00
dirkf 9efa2a0d55 [utils] Split out traversal.py dummy and traversal tests 2024-05-31 01:45:57 +00:00
dirkf 563a8549ae [compat] Improve compat_etree_iterfind for Py2.6
Adapted from https://raw.githubusercontent.com/python/cpython/2.7/Lib/xml/etree/ElementPath.py
2024-05-31 01:45:57 +00:00
dirkf a093e0ac1c [utils] Update traverse_obj() from yt-dlp
* remove `is_user_input` option per https://github.com/yt-dlp/yt-dlp/pull/8673
* support traversal of compat_xml_etree_ElementTree_Element per https://github.com/yt-dlp/yt-dlp/pull/8911
* allow un/branching using all and any per https://github.com/yt-dlp/yt-dlp/pull/9571
* support traversal of compat_cookies.Morsel and multiple types in `set()` keys per https://github.com/yt-dlp/yt-dlp/pull/9577
thx Grub4k for these
* also, move traversal tests to a separate class
* allow for unordered dicts in tests for Py<3.7
2024-05-31 01:45:57 +00:00
github-actions 90cac9424d Release (nightly)
Created by: dirkf

:ci skip all
2024-05-16 18:18:31 +00:00
dirkf 6d8de7ccf7
[workflows/build.yml] Temporary workaround for Python 3.4/5 failures
https://github.com/actions/setup-python/issues/866
2024-05-16 19:07:15 +01:00
github-actions cf9bccb0e5 Release (nightly)
Created by:

:ci skip all
2024-05-16 01:44:06 +00:00
dirkf 095dc798b2 [workflows/ci.yml] Temporary workaround for Python 3.5 _pip_ failures
https://github.com/actions/setup-python/issues/866
2024-05-16 01:43:40 +00:00
github-actions aefc38e78f Release (nightly)
Created by:

:ci skip all
2024-04-22 01:42:45 +00:00
dirkf b163d08481 [YouPorn] Add playlist extractors
* YouPornCategoryIE
* YouPornChannelIE
* YouPornCollectionIE
* YouPornStarIE
* YouPornTagIE
* YouPornVideosIE,
2024-04-22 01:42:16 +00:00
dirkf bf3b23ebb6 [YouPorn] Improve extraction
* detect unwatchable videos
* improve duration extraction
* fix count extraction and support large values
* detect and remove SEO spam boilerplate description
2024-04-22 01:42:16 +00:00
dirkf d2a092db42 [test/test_download] Support 'playlist_maxcount:count' expected value
* parallel to `playlist_mincount'
* specify both for a range of playlist lengths
* if max < min the test will always fail!
2024-04-22 01:42:16 +00:00
dirkf 3357276467 [YouPorn] Incorporate yt-dlp PR 8827
* from https://github.com/yt-dlp/yt-dlp/pull/8827
* extract from webpage instead of broken API URL
* thx The-MAGI
2024-04-22 01:42:16 +00:00
23 changed files with 2236 additions and 1041 deletions

View File

@ -18,7 +18,7 @@ title: ''
<!--
Carefully read and work through this check list in order to prevent the most common mistakes and misuse of youtube-dl:
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.04.08. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.06.12. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- Make sure that all provided video/audio/playlist URLs (if any) are alive and playable in a browser.
- Make sure that all URLs and arguments with special characters are properly quoted or escaped as explained in http://yt-dl.org/escape.
- Search the bugtracker for similar issues: http://yt-dl.org/search-issues. DO NOT post duplicates.
@ -26,7 +26,7 @@ Carefully read and work through this check list in order to prevent the most com
-->
- [ ] I'm reporting a broken site support
- [ ] I've verified that I'm running youtube-dl version **2024.04.08**
- [ ] I've verified that I'm running youtube-dl version **2024.06.12**
- [ ] I've checked that all provided URLs are alive and playable in a browser
- [ ] I've checked that all URLs and arguments with special characters are properly quoted or escaped
- [ ] I've searched the bugtracker for similar issues including closed ones
@ -41,7 +41,7 @@ Add the `-v` flag to your command line you run youtube-dl with (`youtube-dl -v <
[debug] User config: []
[debug] Command-line args: [u'-v', u'http://www.youtube.com/watch?v=BaW_jenozKcj']
[debug] Encodings: locale cp1251, fs mbcs, out cp866, pref cp1251
[debug] youtube-dl version 2024.04.08
[debug] youtube-dl version 2024.06.12
[debug] Python version 2.7.11 - Windows-2003Server-5.2.3790-SP2
[debug] exe versions: ffmpeg N-75573-g1d0487f, ffprobe N-75573-g1d0487f, rtmpdump 2.4
[debug] Proxy map: {}

View File

@ -19,7 +19,7 @@ labels: 'site-support-request'
<!--
Carefully read and work through this check list in order to prevent the most common mistakes and misuse of youtube-dl:
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.04.08. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.06.12. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- Make sure that all provided video/audio/playlist URLs (if any) are alive and playable in a browser.
- Make sure that site you are requesting is not dedicated to copyright infringement, see https://yt-dl.org/copyright-infringement. youtube-dl does not support such sites. In order for site support request to be accepted all provided example URLs should not violate any copyrights.
- Search the bugtracker for similar site support requests: http://yt-dl.org/search-issues. DO NOT post duplicates.
@ -27,7 +27,7 @@ Carefully read and work through this check list in order to prevent the most com
-->
- [ ] I'm reporting a new site support request
- [ ] I've verified that I'm running youtube-dl version **2024.04.08**
- [ ] I've verified that I'm running youtube-dl version **2024.06.12**
- [ ] I've checked that all provided URLs are alive and playable in a browser
- [ ] I've checked that none of provided URLs violate any copyrights
- [ ] I've searched the bugtracker for similar site support requests including closed ones

View File

@ -18,13 +18,13 @@ title: ''
<!--
Carefully read and work through this check list in order to prevent the most common mistakes and misuse of youtube-dl:
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.04.08. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.06.12. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- Search the bugtracker for similar site feature requests: http://yt-dl.org/search-issues. DO NOT post duplicates.
- Finally, put x into all relevant boxes (like this [x])
-->
- [ ] I'm reporting a site feature request
- [ ] I've verified that I'm running youtube-dl version **2024.04.08**
- [ ] I've verified that I'm running youtube-dl version **2024.06.12**
- [ ] I've searched the bugtracker for similar site feature requests including closed ones

View File

@ -18,7 +18,7 @@ title: ''
<!--
Carefully read and work through this check list in order to prevent the most common mistakes and misuse of youtube-dl:
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.04.08. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.06.12. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- Make sure that all provided video/audio/playlist URLs (if any) are alive and playable in a browser.
- Make sure that all URLs and arguments with special characters are properly quoted or escaped as explained in http://yt-dl.org/escape.
- Search the bugtracker for similar issues: http://yt-dl.org/search-issues. DO NOT post duplicates.
@ -27,7 +27,7 @@ Carefully read and work through this check list in order to prevent the most com
-->
- [ ] I'm reporting a broken site support issue
- [ ] I've verified that I'm running youtube-dl version **2024.04.08**
- [ ] I've verified that I'm running youtube-dl version **2024.06.12**
- [ ] I've checked that all provided URLs are alive and playable in a browser
- [ ] I've checked that all URLs and arguments with special characters are properly quoted or escaped
- [ ] I've searched the bugtracker for similar bug reports including closed ones
@ -43,7 +43,7 @@ Add the `-v` flag to your command line you run youtube-dl with (`youtube-dl -v <
[debug] User config: []
[debug] Command-line args: [u'-v', u'http://www.youtube.com/watch?v=BaW_jenozKcj']
[debug] Encodings: locale cp1251, fs mbcs, out cp866, pref cp1251
[debug] youtube-dl version 2024.04.08
[debug] youtube-dl version 2024.06.12
[debug] Python version 2.7.11 - Windows-2003Server-5.2.3790-SP2
[debug] exe versions: ffmpeg N-75573-g1d0487f, ffprobe N-75573-g1d0487f, rtmpdump 2.4
[debug] Proxy map: {}

View File

@ -19,13 +19,13 @@ labels: 'request'
<!--
Carefully read and work through this check list in order to prevent the most common mistakes and misuse of youtube-dl:
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.04.08. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- First of, make sure you are using the latest version of youtube-dl. Run `youtube-dl --version` and ensure your version is 2024.06.12. If it's not, see https://yt-dl.org/update on how to update. Issues with outdated version will be REJECTED.
- Search the bugtracker for similar feature requests: http://yt-dl.org/search-issues. DO NOT post duplicates.
- Finally, put x into all relevant boxes (like this [x])
-->
- [ ] I'm reporting a feature request
- [ ] I've verified that I'm running youtube-dl version **2024.04.08**
- [ ] I've verified that I'm running youtube-dl version **2024.06.12**
- [ ] I've searched the bugtracker for similar feature requests including closed ones

View File

@ -127,6 +127,8 @@ jobs:
needs: build_unix
env:
PYCRYPTO: pycrypto-2.6.1-cp34-none-win32
# Temporary workaround for Python 3.4/5 failures - May 2024
PIP_TRUSTED_HOST: "pypi.python.org pypi.org files.pythonhosted.org"
outputs:
sha256_win: ${{ steps.sha256_win.outputs.sha256_win }}
sha512_win: ${{ steps.sha512_win.outputs.sha512_win }}

View File

@ -159,6 +159,9 @@ jobs:
# wrap broken actions/setup-python@v4
# NB may run apt-get install in Linux
uses: ytdl-org/setup-python@v1
env:
# Temporary workaround for Python 3.5 failures - May 2024
PIP_TRUSTED_HOST: "pypi.python.org pypi.org files.pythonhosted.org"
with:
python-version: ${{ matrix.python-version }}
cache-build: true

View File

@ -5,9 +5,9 @@ import hashlib
import json
import os.path
import re
import types
import ssl
import sys
import types
import unittest
import youtube_dl.extractor
@ -181,18 +181,18 @@ def expect_value(self, got, expected, field):
op, _, expected_num = expected.partition(':')
expected_num = int(expected_num)
if op == 'mincount':
assert_func = assertGreaterEqual
assert_func = self.assertGreaterEqual
msg_tmpl = 'Expected %d items in field %s, but only got %d'
elif op == 'maxcount':
assert_func = assertLessEqual
assert_func = self.assertLessEqual
msg_tmpl = 'Expected maximum %d items in field %s, but got %d'
elif op == 'count':
assert_func = assertEqual
assert_func = self.assertEqual
msg_tmpl = 'Expected exactly %d items in field %s, but got %d'
else:
assert False
assert_func(
self, len(got), expected_num,
len(got), expected_num,
msg_tmpl % (expected_num, field, len(got)))
return
self.assertEqual(
@ -262,27 +262,6 @@ def assertRegexpMatches(self, text, regexp, msg=None):
self.assertTrue(m, msg)
def assertGreaterEqual(self, got, expected, msg=None):
if not (got >= expected):
if msg is None:
msg = '%r not greater than or equal to %r' % (got, expected)
self.assertTrue(got >= expected, msg)
def assertLessEqual(self, got, expected, msg=None):
if not (got <= expected):
if msg is None:
msg = '%r not less than or equal to %r' % (got, expected)
self.assertTrue(got <= expected, msg)
def assertEqual(self, got, expected, msg=None):
if not (got == expected):
if msg is None:
msg = '%r not equal to %r' % (got, expected)
self.assertTrue(got == expected, msg)
def expect_warnings(ydl, warnings_re):
real_warning = ydl.report_warning

View File

@ -153,6 +153,9 @@ class TestInfoExtractor(unittest.TestCase):
'''
search = self.ie._search_nextjs_data(html, 'testID')
self.assertEqual(search['props']['pageProps']['video']['id'], 'testid')
search = self.ie._search_nextjs_data(
'no next.js data here, move along', 'testID', default={'status': 0})
self.assertEqual(search['status'], 0)
def test_search_nuxt_data(self):
html = '''

View File

@ -9,7 +9,6 @@ import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import (
assertGreaterEqual,
expect_warnings,
get_params,
gettestcases,
@ -35,12 +34,20 @@ from youtube_dl.utils import (
ExtractorError,
error_to_compat_str,
format_bytes,
IDENTITY,
preferredencoding,
UnavailableVideoError,
)
from youtube_dl.extractor import get_info_extractor
RETRIES = 3
# Some unittest APIs require actual str
if not isinstance('TEST', str):
_encode_str = lambda s: s.encode(preferredencoding())
else:
_encode_str = IDENTITY
class YoutubeDL(youtube_dl.YoutubeDL):
def __init__(self, *args, **kwargs):
@ -101,7 +108,7 @@ def generator(test_case, tname):
def print_skipping(reason):
print('Skipping %s: %s' % (test_case['name'], reason))
self.skipTest(reason)
self.skipTest(_encode_str(reason))
if not ie.working():
print_skipping('IE marked as not _WORKING')
@ -122,7 +129,10 @@ def generator(test_case, tname):
params['outtmpl'] = tname + '_' + params['outtmpl']
if is_playlist and 'playlist' not in test_case:
params.setdefault('extract_flat', 'in_playlist')
params.setdefault('playlistend', test_case.get('playlist_mincount'))
params.setdefault('playlistend',
test_case['playlist_maxcount'] + 1
if test_case.get('playlist_maxcount')
else test_case.get('playlist_mincount'))
params.setdefault('skip_download', True)
ydl = YoutubeDL(params, auto_init=False)
@ -183,13 +193,19 @@ def generator(test_case, tname):
expect_info_dict(self, res_dict, test_case.get('info_dict', {}))
if 'playlist_mincount' in test_case:
assertGreaterEqual(
self,
self.assertGreaterEqual(
len(res_dict['entries']),
test_case['playlist_mincount'],
'Expected at least %d in playlist %s, but got only %d' % (
test_case['playlist_mincount'], test_case['url'],
len(res_dict['entries'])))
if 'playlist_maxcount' in test_case:
self.assertLessEqual(
len(res_dict['entries']),
test_case['playlist_maxcount'],
'Expected at most %d in playlist %s, but got %d' % (
test_case['playlist_maxcount'], test_case['url'],
len(res_dict['entries'])))
if 'playlist_count' in test_case:
self.assertEqual(
len(res_dict['entries']),
@ -231,8 +247,8 @@ def generator(test_case, tname):
if params.get('test'):
expected_minsize = max(expected_minsize, 10000)
got_fsize = os.path.getsize(tc_filename)
assertGreaterEqual(
self, got_fsize, expected_minsize,
self.assertGreaterEqual(
got_fsize, expected_minsize,
'Expected %s to be at least %s, but it\'s only %s ' %
(tc_filename, format_bytes(expected_minsize),
format_bytes(got_fsize)))

509
test/test_traversal.py Normal file
View File

@ -0,0 +1,509 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import unicode_literals
# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import re
from youtube_dl.traversal import (
dict_get,
get_first,
T,
traverse_obj,
)
from youtube_dl.compat import (
compat_etree_fromstring,
compat_http_cookies,
compat_str,
)
from youtube_dl.utils import (
int_or_none,
str_or_none,
)
_TEST_DATA = {
100: 100,
1.2: 1.2,
'str': 'str',
'None': None,
'...': Ellipsis,
'urls': [
{'index': 0, 'url': 'https://www.example.com/0'},
{'index': 1, 'url': 'https://www.example.com/1'},
],
'data': (
{'index': 2},
{'index': 3},
),
'dict': {},
}
if sys.version_info < (3, 0):
class _TestCase(unittest.TestCase):
def assertCountEqual(self, *args, **kwargs):
return self.assertItemsEqual(*args, **kwargs)
else:
_TestCase = unittest.TestCase
class TestTraversal(_TestCase):
def assertMaybeCountEqual(self, *args, **kwargs):
if sys.version_info < (3, 7):
# random dict order
return self.assertCountEqual(*args, **kwargs)
else:
return self.assertEqual(*args, **kwargs)
def test_traverse_obj(self):
# instant compat
str = compat_str
# define a pukka Iterable
def iter_range(stop):
for from_ in range(stop):
yield from_
# Test base functionality
self.assertEqual(traverse_obj(_TEST_DATA, ('str',)), 'str',
msg='allow tuple path')
self.assertEqual(traverse_obj(_TEST_DATA, ['str']), 'str',
msg='allow list path')
self.assertEqual(traverse_obj(_TEST_DATA, (value for value in ("str",))), 'str',
msg='allow iterable path')
self.assertEqual(traverse_obj(_TEST_DATA, 'str'), 'str',
msg='single items should be treated as a path')
self.assertEqual(traverse_obj(_TEST_DATA, None), _TEST_DATA)
self.assertEqual(traverse_obj(_TEST_DATA, 100), 100)
self.assertEqual(traverse_obj(_TEST_DATA, 1.2), 1.2)
# Test Ellipsis behavior
self.assertCountEqual(traverse_obj(_TEST_DATA, Ellipsis),
(item for item in _TEST_DATA.values() if item not in (None, {})),
msg='`...` should give all non-discarded values')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, Ellipsis)), _TEST_DATA['urls'][0].values(),
msg='`...` selection for dicts should select all values')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'url')),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='nested `...` queries should work')
self.assertCountEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'index')), iter_range(4),
msg='`...` query result should be flattened')
self.assertEqual(traverse_obj(iter(range(4)), Ellipsis), list(range(4)),
msg='`...` should accept iterables')
# Test function as key
self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)),
[_TEST_DATA['urls']],
msg='function as query key should perform a filter based on (key, value)')
self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], str)), set(('str',)),
msg='exceptions in the query function should be caught')
self.assertEqual(traverse_obj(iter(range(4)), lambda _, x: x % 2 == 0), [0, 2],
msg='function key should accept iterables')
if __debug__:
with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
traverse_obj(_TEST_DATA, lambda a: Ellipsis)
with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
traverse_obj(_TEST_DATA, lambda a, b, c: Ellipsis)
# Test set as key (transformation/type, like `expected_type`)
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str.upper), )), ['STR'],
msg='Function in set should be a transformation')
self.assertEqual(traverse_obj(_TEST_DATA, ('fail', T(lambda _: 'const'))), 'const',
msg='Function in set should always be called')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str))), ['str'],
msg='Type in set should be a type filter')
self.assertMaybeCountEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str, int))), [100, 'str'],
msg='Multiple types in set should be a type filter')
self.assertEqual(traverse_obj(_TEST_DATA, T(dict)), _TEST_DATA,
msg='A single set should be wrapped into a path')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str.upper))), ['STR'],
msg='Transformation function should not raise')
self.assertMaybeCountEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str_or_none))),
[item for item in map(str_or_none, _TEST_DATA.values()) if item is not None],
msg='Function in set should be a transformation')
if __debug__:
with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
traverse_obj(_TEST_DATA, set())
with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
traverse_obj(_TEST_DATA, set((str.upper, str)))
# Test `slice` as a key
_SLICE_DATA = [0, 1, 2, 3, 4]
self.assertEqual(traverse_obj(_TEST_DATA, ('dict', slice(1))), None,
msg='slice on a dictionary should not throw')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1)), _SLICE_DATA[:1],
msg='slice key should apply slice to sequence')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 2)), _SLICE_DATA[1:2],
msg='slice key should apply slice to sequence')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 4, 2)), _SLICE_DATA[1:4:2],
msg='slice key should apply slice to sequence')
# Test alternative paths
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str',
msg='multiple `paths` should be treated as alternative paths')
self.assertEqual(traverse_obj(_TEST_DATA, 'str', 100), 'str',
msg='alternatives should exit early')
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'fail'), None,
msg='alternatives should return `default` if exhausted')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, 'fail'), 100), 100,
msg='alternatives should track their own branching return')
self.assertEqual(traverse_obj(_TEST_DATA, ('dict', Ellipsis), ('data', Ellipsis)), list(_TEST_DATA['data']),
msg='alternatives on empty objects should search further')
# Test branch and path nesting
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', (3, 0), 'url')), ['https://www.example.com/0'],
msg='tuple as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', [3, 0], 'url')), ['https://www.example.com/0'],
msg='list as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ((1, 'fail'), (0, 'url')))), ['https://www.example.com/0'],
msg='double nesting in path should be treated as paths')
self.assertEqual(traverse_obj(['0', [1, 2]], [(0, 1), 0]), [1],
msg='do not fail early on branching')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', ((1, ('fail', 'url')), (0, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='triple nesting in path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ('fail', (Ellipsis, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='ellipsis as branch path start gets flattened')
# Test dictionary as key
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}), {0: 100, 1: 1.2},
msg='dict key should result in a dict with the same keys')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', 0, 'url')}),
{0: 'https://www.example.com/0'},
msg='dict key should allow paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', (3, 0), 'url')}),
{0: ['https://www.example.com/0']},
msg='tuple in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, 'fail'), (0, 'url')))}),
{0: ['https://www.example.com/0']},
msg='double nesting in dict path should be treated as paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, ('fail', 'url')), (0, 'url')))}),
{0: ['https://www.example.com/1', 'https://www.example.com/0']},
msg='triple nesting in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {},
msg='remove `None` values when top level dict key fails')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=Ellipsis), {0: Ellipsis},
msg='use `default` if key fails and `default`')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {},
msg='remove empty values when dict key')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=Ellipsis), {0: Ellipsis},
msg='use `default` when dict key and a default')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}), {},
msg='remove empty values when nested dict key fails')
self.assertEqual(traverse_obj(None, {0: 'fail'}), {},
msg='default to dict if pruned')
self.assertEqual(traverse_obj(None, {0: 'fail'}, default=Ellipsis), {0: Ellipsis},
msg='default to dict if pruned and default is given')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}, default=Ellipsis), {0: {0: Ellipsis}},
msg='use nested `default` when nested dict key fails and `default`')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', Ellipsis)}), {},
msg='remove key if branch in dict key not successful')
# Testing default parameter behavior
_DEFAULT_DATA = {'None': None, 'int': 0, 'list': []}
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail'), None,
msg='default value should be `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', 'fail', default=Ellipsis), Ellipsis,
msg='chained fails should result in default')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', 'int'), 0,
msg='should not short cirquit on `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', default=1), 1,
msg='invalid dict key should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', default=1), 1,
msg='`None` is a deliberate sentinel and should become `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', 10)), None,
msg='`IndexError` should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=1), 1,
msg='if branched but not successful return `default` if defined, not `[]`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=None), None,
msg='if branched but not successful return `default` even if `default` is `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail')), [],
msg='if branched but not successful return `[]`, not `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', Ellipsis)), [],
msg='if branched but object is empty return `[]`, not `default`')
self.assertEqual(traverse_obj(None, Ellipsis), [],
msg='if branched but object is `None` return `[]`, not `default`')
self.assertEqual(traverse_obj({0: None}, (0, Ellipsis)), [],
msg='if branched but state is `None` return `[]`, not `default`')
branching_paths = [
('fail', Ellipsis),
(Ellipsis, 'fail'),
100 * ('fail',) + (Ellipsis,),
(Ellipsis,) + 100 * ('fail',),
]
for branching_path in branching_paths:
self.assertEqual(traverse_obj({}, branching_path), [],
msg='if branched but state is `None`, return `[]` (not `default`)')
self.assertEqual(traverse_obj({}, 'fail', branching_path), [],
msg='if branching in last alternative and previous did not match, return `[]` (not `default`)')
self.assertEqual(traverse_obj({0: 'x'}, 0, branching_path), 'x',
msg='if branching in last alternative and previous did match, return single value')
self.assertEqual(traverse_obj({0: 'x'}, branching_path, 0), 'x',
msg='if branching in first alternative and non-branching path does match, return single value')
self.assertEqual(traverse_obj({}, branching_path, 'fail'), None,
msg='if branching in first alternative and non-branching path does not match, return `default`')
# Testing expected_type behavior
_EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0}
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str),
'str', msg='accept matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int),
None, msg='reject non-matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)),
'0', msg='transform type using type function')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=lambda _: 1 / 0),
None, msg='wrap expected_type function in try_call')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, Ellipsis, expected_type=str),
['str'], msg='eliminate items that expected_type fails on')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int),
{0: 100}, msg='type as expected_type should filter dict values')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none),
{0: '100', 1: '1.2'}, msg='function as expected_type should transform dict values')
self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, set((int_or_none,))), expected_type=int),
1, msg='expected_type should not filter non-final dict values')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int),
{0: {0: 100}}, msg='expected_type should transform deep dict values')
self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(Ellipsis)),
[{0: Ellipsis}, {0: Ellipsis}], msg='expected_type should transform branched dict values')
self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int),
[4], msg='expected_type regression for type matching in tuple branching')
self.assertEqual(traverse_obj(_TEST_DATA, ['data', Ellipsis], expected_type=int),
[], msg='expected_type regression for type matching in dict result')
# Test get_all behavior
_GET_ALL_DATA = {'key': [0, 1, 2]}
self.assertEqual(traverse_obj(_GET_ALL_DATA, ('key', Ellipsis), get_all=False), 0,
msg='if not `get_all`, return only first matching value')
self.assertEqual(traverse_obj(_GET_ALL_DATA, Ellipsis, get_all=False), [0, 1, 2],
msg='do not overflatten if not `get_all`')
# Test casesense behavior
_CASESENSE_DATA = {
'KeY': 'value0',
0: {
'KeY': 'value1',
0: {'KeY': 'value2'},
},
# FULLWIDTH LATIN CAPITAL LETTER K
'\uff2bey': 'value3',
}
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'key'), None,
msg='dict keys should be case sensitive unless `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'keY',
casesense=False), 'value0',
msg='allow non matching key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, '\uff4bey', # FULLWIDTH LATIN SMALL LETTER K
casesense=False), 'value3',
msg='allow non matching Unicode key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ('keY',)),
casesense=False), ['value1'],
msg='allow non matching key case in branch if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ((0, 'keY'),)),
casesense=False), ['value2'],
msg='allow non matching key case in branch path if `casesense`')
# Test traverse_string behavior
_TRAVERSE_STRING_DATA = {'str': 'str', 1.2: 1.2}
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0)), None,
msg='do not traverse into string if not `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0),
_traverse_string=True), 's',
msg='traverse into string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, (1.2, 1),
_traverse_string=True), '.',
msg='traverse into converted data if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', Ellipsis),
_traverse_string=True), 'str',
msg='`...` should result in string (same value) if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', slice(0, None, 2)),
_traverse_string=True), 'sr',
msg='`slice` should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda i, v: i or v == 's'),
_traverse_string=True), 'str',
msg='function should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)),
_traverse_string=True), ['s', 'r'],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, Ellipsis), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, lambda x, y: True), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, slice(1)), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
# Test re.Match as input obj
mobj = re.match(r'^0(12)(?P<group>3)(4)?$', '0123')
self.assertEqual(traverse_obj(mobj, Ellipsis), [x for x in mobj.groups() if x is not None],
msg='`...` on a `re.Match` should give its `groups()`')
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 2)), ['0123', '3'],
msg='function on a `re.Match` should give groupno, value starting at 0')
self.assertEqual(traverse_obj(mobj, 'group'), '3',
msg='str key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 2), '3',
msg='int key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 'gRoUp', casesense=False), '3',
msg='str key on a `re.Match` should respect casesense')
self.assertEqual(traverse_obj(mobj, 'fail'), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 'gRoUpS', casesense=False), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 8), None,
msg='failing int key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
msg='function on a `re.Match` should give group name as well')
# Test xml.etree.ElementTree.Element as input obj
etree = compat_etree_fromstring('''<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>''')
self.assertEqual(traverse_obj(etree, ''), etree,
msg='empty str key should return the element itself')
self.assertEqual(traverse_obj(etree, 'country'), list(etree),
msg='str key should return all children with that tag name')
self.assertEqual(traverse_obj(etree, Ellipsis), list(etree),
msg='`...` as key should return all children')
self.assertEqual(traverse_obj(etree, lambda _, x: x[0].text == '4'), [etree[1]],
msg='function as key should get element as value')
self.assertEqual(traverse_obj(etree, lambda i, _: i == 1), [etree[1]],
msg='function as key should get index as key')
self.assertEqual(traverse_obj(etree, 0), etree[0],
msg='int key should return the nth child')
self.assertEqual(traverse_obj(etree, './/neighbor/@name'),
['Austria', 'Switzerland', 'Malaysia', 'Costa Rica', 'Colombia'],
msg='`@<attribute>` at end of path should give that attribute')
self.assertEqual(traverse_obj(etree, '//neighbor/@fail'), [None, None, None, None, None],
msg='`@<nonexistent>` at end of path should give `None`')
self.assertEqual(traverse_obj(etree, ('//neighbor/@', 2)), {'name': 'Malaysia', 'direction': 'N'},
msg='`@` should give the full attribute dict')
self.assertEqual(traverse_obj(etree, '//year/text()'), ['2008', '2011', '2011'],
msg='`text()` at end of path should give the inner text')
self.assertEqual(traverse_obj(etree, '//*[@direction]/@direction'), ['E', 'W', 'N', 'W', 'E'],
msg='full python xpath features should be supported')
self.assertEqual(traverse_obj(etree, (0, '@name')), 'Liechtenstein',
msg='special transformations should act on current element')
self.assertEqual(traverse_obj(etree, ('country', 0, Ellipsis, 'text()', T(int_or_none))), [1, 2008, 141100],
msg='special transformations should act on current element')
def test_traversal_unbranching(self):
self.assertEqual(traverse_obj(_TEST_DATA, [(100, 1.2), all]), [100, 1.2],
msg='`all` should give all results as list')
self.assertEqual(traverse_obj(_TEST_DATA, [(100, 1.2), any]), 100,
msg='`any` should give the first result')
self.assertEqual(traverse_obj(_TEST_DATA, [100, all]), [100],
msg='`all` should give list if non branching')
self.assertEqual(traverse_obj(_TEST_DATA, [100, any]), 100,
msg='`any` should give single item if non branching')
self.assertEqual(traverse_obj(_TEST_DATA, [('dict', 'None', 100), all]), [100],
msg='`all` should filter `None` and empty dict')
self.assertEqual(traverse_obj(_TEST_DATA, [('dict', 'None', 100), any]), 100,
msg='`any` should filter `None` and empty dict')
self.assertEqual(traverse_obj(_TEST_DATA, [{
'all': [('dict', 'None', 100, 1.2), all],
'any': [('dict', 'None', 100, 1.2), any],
}]), {'all': [100, 1.2], 'any': 100},
msg='`all`/`any` should apply to each dict path separately')
self.assertEqual(traverse_obj(_TEST_DATA, [{
'all': [('dict', 'None', 100, 1.2), all],
'any': [('dict', 'None', 100, 1.2), any],
}], get_all=False), {'all': [100, 1.2], 'any': 100},
msg='`all`/`any` should apply to dict regardless of `get_all`')
self.assertIs(traverse_obj(_TEST_DATA, [('dict', 'None', 100, 1.2), all, T(float)]), None,
msg='`all` should reset branching status')
self.assertIs(traverse_obj(_TEST_DATA, [('dict', 'None', 100, 1.2), any, T(float)]), None,
msg='`any` should reset branching status')
self.assertEqual(traverse_obj(_TEST_DATA, [('dict', 'None', 100, 1.2), all, Ellipsis, T(float)]), [1.2],
msg='`all` should allow further branching')
self.assertEqual(traverse_obj(_TEST_DATA, [('dict', 'None', 'urls', 'data'), any, Ellipsis, 'index']), [0, 1],
msg='`any` should allow further branching')
def test_traversal_morsel(self):
values = {
'expires': 'a',
'path': 'b',
'comment': 'c',
'domain': 'd',
'max-age': 'e',
'secure': 'f',
'httponly': 'g',
'version': 'h',
'samesite': 'i',
}
# SameSite added in Py3.8, breaks .update for 3.5-3.7
if sys.version_info < (3, 8):
del values['samesite']
morsel = compat_http_cookies.Morsel()
morsel.set(str('item_key'), 'item_value', 'coded_value')
morsel.update(values)
values['key'] = str('item_key')
values['value'] = 'item_value'
values = dict((str(k), v) for k, v in values.items())
# make test pass even without ordered dict
value_set = set(values.values())
for key, value in values.items():
self.assertEqual(traverse_obj(morsel, key), value,
msg='Morsel should provide access to all values')
self.assertEqual(set(traverse_obj(morsel, Ellipsis)), value_set,
msg='`...` should yield all values')
self.assertEqual(set(traverse_obj(morsel, lambda k, v: True)), value_set,
msg='function key should yield all values')
self.assertIs(traverse_obj(morsel, [(None,), any]), morsel,
msg='Morsel should not be implicitly changed to dict on usage')
def test_get_first(self):
self.assertEqual(get_first([{'a': None}, {'a': 'spam'}], 'a'), 'spam')
def test_dict_get(self):
FALSE_VALUES = {
'none': None,
'false': False,
'zero': 0,
'empty_string': '',
'empty_list': [],
}
d = FALSE_VALUES.copy()
d['a'] = 42
self.assertEqual(dict_get(d, 'a'), 42)
self.assertEqual(dict_get(d, 'b'), None)
self.assertEqual(dict_get(d, 'b', 42), 42)
self.assertEqual(dict_get(d, ('a', )), 42)
self.assertEqual(dict_get(d, ('b', 'a', )), 42)
self.assertEqual(dict_get(d, ('b', 'c', 'a', 'd', )), 42)
self.assertEqual(dict_get(d, ('b', 'c', )), None)
self.assertEqual(dict_get(d, ('b', 'c', ), 42), 42)
for key, false_value in FALSE_VALUES.items():
self.assertEqual(dict_get(d, ('b', 'c', key, )), None)
self.assertEqual(dict_get(d, ('b', 'c', key, ), skip_false_values=False), false_value)
if __name__ == '__main__':
unittest.main()

View File

@ -14,7 +14,6 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import io
import itertools
import json
import re
import xml.etree.ElementTree
from youtube_dl.utils import (
@ -28,7 +27,6 @@ from youtube_dl.utils import (
DateRange,
detect_exe_version,
determine_ext,
dict_get,
encode_base_n,
encode_compat_str,
encodeFilename,
@ -44,7 +42,6 @@ from youtube_dl.utils import (
get_element_by_attribute,
get_elements_by_class,
get_elements_by_attribute,
get_first,
InAdvancePagedList,
int_or_none,
intlist_to_bytes,
@ -84,14 +81,11 @@ from youtube_dl.utils import (
sanitized_Request,
shell_quote,
smuggle_url,
str_or_none,
str_to_int,
strip_jsonp,
strip_or_none,
subtitles_filename,
T,
timeconvert,
traverse_obj,
try_call,
unescapeHTML,
unified_strdate,
@ -132,10 +126,6 @@ from youtube_dl.compat import (
class TestUtil(unittest.TestCase):
# yt-dlp shim
def assertCountEqual(self, expected, got, msg='count should be the same'):
return self.assertEqual(len(tuple(expected)), len(tuple(got)), msg=msg)
def test_timeconvert(self):
self.assertTrue(timeconvert('') is None)
self.assertTrue(timeconvert('bougrg') is None)
@ -740,28 +730,6 @@ class TestUtil(unittest.TestCase):
self.assertRaises(
ValueError, multipart_encode, {b'field': b'value'}, boundary='value')
def test_dict_get(self):
FALSE_VALUES = {
'none': None,
'false': False,
'zero': 0,
'empty_string': '',
'empty_list': [],
}
d = FALSE_VALUES.copy()
d['a'] = 42
self.assertEqual(dict_get(d, 'a'), 42)
self.assertEqual(dict_get(d, 'b'), None)
self.assertEqual(dict_get(d, 'b', 42), 42)
self.assertEqual(dict_get(d, ('a', )), 42)
self.assertEqual(dict_get(d, ('b', 'a', )), 42)
self.assertEqual(dict_get(d, ('b', 'c', 'a', 'd', )), 42)
self.assertEqual(dict_get(d, ('b', 'c', )), None)
self.assertEqual(dict_get(d, ('b', 'c', ), 42), 42)
for key, false_value in FALSE_VALUES.items():
self.assertEqual(dict_get(d, ('b', 'c', key, )), None)
self.assertEqual(dict_get(d, ('b', 'c', key, ), skip_false_values=False), false_value)
def test_merge_dicts(self):
self.assertEqual(merge_dicts({'a': 1}, {'b': 2}), {'a': 1, 'b': 2})
self.assertEqual(merge_dicts({'a': 1}, {'a': 2}), {'a': 1})
@ -1703,336 +1671,6 @@ Line 1
self.assertEqual(variadic('spam', allowed_types=dict), 'spam')
self.assertEqual(variadic('spam', allowed_types=[dict]), 'spam')
def test_traverse_obj(self):
str = compat_str
_TEST_DATA = {
100: 100,
1.2: 1.2,
'str': 'str',
'None': None,
'...': Ellipsis,
'urls': [
{'index': 0, 'url': 'https://www.example.com/0'},
{'index': 1, 'url': 'https://www.example.com/1'},
],
'data': (
{'index': 2},
{'index': 3},
),
'dict': {},
}
# define a pukka Iterable
def iter_range(stop):
for from_ in range(stop):
yield from_
# Test base functionality
self.assertEqual(traverse_obj(_TEST_DATA, ('str',)), 'str',
msg='allow tuple path')
self.assertEqual(traverse_obj(_TEST_DATA, ['str']), 'str',
msg='allow list path')
self.assertEqual(traverse_obj(_TEST_DATA, (value for value in ("str",))), 'str',
msg='allow iterable path')
self.assertEqual(traverse_obj(_TEST_DATA, 'str'), 'str',
msg='single items should be treated as a path')
self.assertEqual(traverse_obj(_TEST_DATA, None), _TEST_DATA)
self.assertEqual(traverse_obj(_TEST_DATA, 100), 100)
self.assertEqual(traverse_obj(_TEST_DATA, 1.2), 1.2)
# Test Ellipsis behavior
self.assertCountEqual(traverse_obj(_TEST_DATA, Ellipsis),
(item for item in _TEST_DATA.values() if item not in (None, {})),
msg='`...` should give all non-discarded values')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, Ellipsis)), _TEST_DATA['urls'][0].values(),
msg='`...` selection for dicts should select all values')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'url')),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='nested `...` queries should work')
self.assertCountEqual(traverse_obj(_TEST_DATA, (Ellipsis, Ellipsis, 'index')), iter_range(4),
msg='`...` query result should be flattened')
self.assertEqual(traverse_obj(iter(range(4)), Ellipsis), list(range(4)),
msg='`...` should accept iterables')
# Test function as key
self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)),
[_TEST_DATA['urls']],
msg='function as query key should perform a filter based on (key, value)')
self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], str)), set(('str',)),
msg='exceptions in the query function should be caught')
self.assertEqual(traverse_obj(iter(range(4)), lambda _, x: x % 2 == 0), [0, 2],
msg='function key should accept iterables')
if __debug__:
with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
traverse_obj(_TEST_DATA, lambda a: Ellipsis)
with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'):
traverse_obj(_TEST_DATA, lambda a, b, c: Ellipsis)
# Test set as key (transformation/type, like `expected_type`)
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str.upper), )), ['STR'],
msg='Function in set should be a transformation')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str))), ['str'],
msg='Type in set should be a type filter')
self.assertEqual(traverse_obj(_TEST_DATA, T(dict)), _TEST_DATA,
msg='A single set should be wrapped into a path')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str.upper))), ['STR'],
msg='Transformation function should not raise')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, T(str_or_none))),
[item for item in map(str_or_none, _TEST_DATA.values()) if item is not None],
msg='Function in set should be a transformation')
if __debug__:
with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
traverse_obj(_TEST_DATA, set())
with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'):
traverse_obj(_TEST_DATA, set((str.upper, str)))
# Test `slice` as a key
_SLICE_DATA = [0, 1, 2, 3, 4]
self.assertEqual(traverse_obj(_TEST_DATA, ('dict', slice(1))), None,
msg='slice on a dictionary should not throw')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1)), _SLICE_DATA[:1],
msg='slice key should apply slice to sequence')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 2)), _SLICE_DATA[1:2],
msg='slice key should apply slice to sequence')
self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 4, 2)), _SLICE_DATA[1:4:2],
msg='slice key should apply slice to sequence')
# Test alternative paths
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str',
msg='multiple `paths` should be treated as alternative paths')
self.assertEqual(traverse_obj(_TEST_DATA, 'str', 100), 'str',
msg='alternatives should exit early')
self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'fail'), None,
msg='alternatives should return `default` if exhausted')
self.assertEqual(traverse_obj(_TEST_DATA, (Ellipsis, 'fail'), 100), 100,
msg='alternatives should track their own branching return')
self.assertEqual(traverse_obj(_TEST_DATA, ('dict', Ellipsis), ('data', Ellipsis)), list(_TEST_DATA['data']),
msg='alternatives on empty objects should search further')
# Test branch and path nesting
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', (3, 0), 'url')), ['https://www.example.com/0'],
msg='tuple as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', [3, 0], 'url')), ['https://www.example.com/0'],
msg='list as key should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ((1, 'fail'), (0, 'url')))), ['https://www.example.com/0'],
msg='double nesting in path should be treated as paths')
self.assertEqual(traverse_obj(['0', [1, 2]], [(0, 1), 0]), [1],
msg='do not fail early on branching')
self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', ((1, ('fail', 'url')), (0, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='triple nesting in path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, ('urls', ('fail', (Ellipsis, 'url')))),
['https://www.example.com/0', 'https://www.example.com/1'],
msg='ellipsis as branch path start gets flattened')
# Test dictionary as key
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}), {0: 100, 1: 1.2},
msg='dict key should result in a dict with the same keys')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', 0, 'url')}),
{0: 'https://www.example.com/0'},
msg='dict key should allow paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', (3, 0), 'url')}),
{0: ['https://www.example.com/0']},
msg='tuple in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, 'fail'), (0, 'url')))}),
{0: ['https://www.example.com/0']},
msg='double nesting in dict path should be treated as paths')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('urls', ((1, ('fail', 'url')), (0, 'url')))}),
{0: ['https://www.example.com/1', 'https://www.example.com/0']},
msg='triple nesting in dict path should be treated as branches')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {},
msg='remove `None` values when top level dict key fails')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=Ellipsis), {0: Ellipsis},
msg='use `default` if key fails and `default`')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {},
msg='remove empty values when dict key')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=Ellipsis), {0: Ellipsis},
msg='use `default` when dict key and a default')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}), {},
msg='remove empty values when nested dict key fails')
self.assertEqual(traverse_obj(None, {0: 'fail'}), {},
msg='default to dict if pruned')
self.assertEqual(traverse_obj(None, {0: 'fail'}, default=Ellipsis), {0: Ellipsis},
msg='default to dict if pruned and default is given')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}, default=Ellipsis), {0: {0: Ellipsis}},
msg='use nested `default` when nested dict key fails and `default`')
self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', Ellipsis)}), {},
msg='remove key if branch in dict key not successful')
# Testing default parameter behavior
_DEFAULT_DATA = {'None': None, 'int': 0, 'list': []}
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail'), None,
msg='default value should be `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', 'fail', default=Ellipsis), Ellipsis,
msg='chained fails should result in default')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', 'int'), 0,
msg='should not short cirquit on `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'fail', default=1), 1,
msg='invalid dict key should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, 'None', default=1), 1,
msg='`None` is a deliberate sentinel and should become `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', 10)), None,
msg='`IndexError` should result in `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=1), 1,
msg='if branched but not successful return `default` if defined, not `[]`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail'), default=None), None,
msg='if branched but not successful return `default` even if `default` is `None`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, (Ellipsis, 'fail')), [],
msg='if branched but not successful return `[]`, not `default`')
self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', Ellipsis)), [],
msg='if branched but object is empty return `[]`, not `default`')
self.assertEqual(traverse_obj(None, Ellipsis), [],
msg='if branched but object is `None` return `[]`, not `default`')
self.assertEqual(traverse_obj({0: None}, (0, Ellipsis)), [],
msg='if branched but state is `None` return `[]`, not `default`')
branching_paths = [
('fail', Ellipsis),
(Ellipsis, 'fail'),
100 * ('fail',) + (Ellipsis,),
(Ellipsis,) + 100 * ('fail',),
]
for branching_path in branching_paths:
self.assertEqual(traverse_obj({}, branching_path), [],
msg='if branched but state is `None`, return `[]` (not `default`)')
self.assertEqual(traverse_obj({}, 'fail', branching_path), [],
msg='if branching in last alternative and previous did not match, return `[]` (not `default`)')
self.assertEqual(traverse_obj({0: 'x'}, 0, branching_path), 'x',
msg='if branching in last alternative and previous did match, return single value')
self.assertEqual(traverse_obj({0: 'x'}, branching_path, 0), 'x',
msg='if branching in first alternative and non-branching path does match, return single value')
self.assertEqual(traverse_obj({}, branching_path, 'fail'), None,
msg='if branching in first alternative and non-branching path does not match, return `default`')
# Testing expected_type behavior
_EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0}
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str),
'str', msg='accept matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int),
None, msg='reject non-matching `expected_type` type')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)),
'0', msg='transform type using type function')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=lambda _: 1 / 0),
None, msg='wrap expected_type function in try_call')
self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, Ellipsis, expected_type=str),
['str'], msg='eliminate items that expected_type fails on')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int),
{0: 100}, msg='type as expected_type should filter dict values')
self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none),
{0: '100', 1: '1.2'}, msg='function as expected_type should transform dict values')
self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, set((int_or_none,))), expected_type=int),
1, msg='expected_type should not filter non-final dict values')
self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int),
{0: {0: 100}}, msg='expected_type should transform deep dict values')
self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(Ellipsis)),
[{0: Ellipsis}, {0: Ellipsis}], msg='expected_type should transform branched dict values')
self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int),
[4], msg='expected_type regression for type matching in tuple branching')
self.assertEqual(traverse_obj(_TEST_DATA, ['data', Ellipsis], expected_type=int),
[], msg='expected_type regression for type matching in dict result')
# Test get_all behavior
_GET_ALL_DATA = {'key': [0, 1, 2]}
self.assertEqual(traverse_obj(_GET_ALL_DATA, ('key', Ellipsis), get_all=False), 0,
msg='if not `get_all`, return only first matching value')
self.assertEqual(traverse_obj(_GET_ALL_DATA, Ellipsis, get_all=False), [0, 1, 2],
msg='do not overflatten if not `get_all`')
# Test casesense behavior
_CASESENSE_DATA = {
'KeY': 'value0',
0: {
'KeY': 'value1',
0: {'KeY': 'value2'},
},
# FULLWIDTH LATIN CAPITAL LETTER K
'\uff2bey': 'value3',
}
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'key'), None,
msg='dict keys should be case sensitive unless `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, 'keY',
casesense=False), 'value0',
msg='allow non matching key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, '\uff4bey', # FULLWIDTH LATIN SMALL LETTER K
casesense=False), 'value3',
msg='allow non matching Unicode key case if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ('keY',)),
casesense=False), ['value1'],
msg='allow non matching key case in branch if `casesense`')
self.assertEqual(traverse_obj(_CASESENSE_DATA, (0, ((0, 'keY'),)),
casesense=False), ['value2'],
msg='allow non matching key case in branch path if `casesense`')
# Test traverse_string behavior
_TRAVERSE_STRING_DATA = {'str': 'str', 1.2: 1.2}
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0)), None,
msg='do not traverse into string if not `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', 0),
_traverse_string=True), 's',
msg='traverse into string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, (1.2, 1),
_traverse_string=True), '.',
msg='traverse into converted data if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', Ellipsis),
_traverse_string=True), 'str',
msg='`...` should result in string (same value) if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', slice(0, None, 2)),
_traverse_string=True), 'sr',
msg='`slice` should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda i, v: i or v == 's'),
_traverse_string=True), 'str',
msg='function should result in string if `traverse_string`')
self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)),
_traverse_string=True), ['s', 'r'],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, Ellipsis), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, lambda x, y: True), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
self.assertEqual(traverse_obj({}, (0, slice(1)), _traverse_string=True), [],
msg='branching should result in list if `traverse_string`')
# Test is_user_input behavior
_IS_USER_INPUT_DATA = {'range8': list(range(8))}
self.assertEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3'),
_is_user_input=True), 3,
msg='allow for string indexing if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', '3:'),
_is_user_input=True), tuple(range(8))[3:],
msg='allow for string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':4:2'),
_is_user_input=True), tuple(range(8))[:4:2],
msg='allow step in string slice if `is_user_input`')
self.assertCountEqual(traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':'),
_is_user_input=True), range(8),
msg='`:` should be treated as `...` if `is_user_input`')
with self.assertRaises(TypeError, msg='too many params should result in error'):
traverse_obj(_IS_USER_INPUT_DATA, ('range8', ':::'), _is_user_input=True)
# Test re.Match as input obj
mobj = re.match(r'^0(12)(?P<group>3)(4)?$', '0123')
self.assertEqual(traverse_obj(mobj, Ellipsis), [x for x in mobj.groups() if x is not None],
msg='`...` on a `re.Match` should give its `groups()`')
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 2)), ['0123', '3'],
msg='function on a `re.Match` should give groupno, value starting at 0')
self.assertEqual(traverse_obj(mobj, 'group'), '3',
msg='str key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 2), '3',
msg='int key on a `re.Match` should give group with that name')
self.assertEqual(traverse_obj(mobj, 'gRoUp', casesense=False), '3',
msg='str key on a `re.Match` should respect casesense')
self.assertEqual(traverse_obj(mobj, 'fail'), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 'gRoUpS', casesense=False), None,
msg='failing str key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, 8), None,
msg='failing int key on a `re.Match` should return `default`')
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
msg='function on a `re.Match` should give group name as well')
def test_get_first(self):
self.assertEqual(get_first([{'a': None}, {'a': 'spam'}], 'a'), 'spam')
def test_join_nonempty(self):
self.assertEqual(join_nonempty('a', 'b'), 'a-b')
self.assertEqual(join_nonempty(

View File

@ -1043,8 +1043,8 @@ class YoutubeDL(object):
elif result_type in ('playlist', 'multi_video'):
# Protect from infinite recursion due to recursively nested playlists
# (see https://github.com/ytdl-org/youtube-dl/issues/27833)
webpage_url = ie_result['webpage_url']
if webpage_url in self._playlist_urls:
webpage_url = ie_result.get('webpage_url') # not all pl/mv have this
if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
'[download] Skipping already downloaded playlist: %s'
% ie_result.get('title') or ie_result.get('id'))
@ -1052,6 +1052,10 @@ class YoutubeDL(object):
self._playlist_level += 1
self._playlist_urls.add(webpage_url)
new_result = dict((k, v) for k, v in extra_info.items() if k not in ie_result)
if new_result:
new_result.update(ie_result)
ie_result = new_result
try:
return self.__process_playlist(ie_result, download)
finally:
@ -1597,6 +1601,28 @@ class YoutubeDL(object):
self.cookiejar.add_cookie_header(pr)
return pr.get_header('Cookie')
def _fill_common_fields(self, info_dict, final=True):
for ts_key, date_key in (
('timestamp', 'upload_date'),
('release_timestamp', 'release_date'),
):
if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
# Working around out-of-range timestamp values (e.g. negative ones on Windows,
# see http://bugs.python.org/issue1646728)
try:
upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
info_dict[date_key] = compat_str(upload_date.strftime('%Y%m%d'))
except (ValueError, OverflowError, OSError):
pass
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
if final:
for field in ('chapter', 'season', 'episode'):
if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
def process_video_result(self, info_dict, download=True):
assert info_dict.get('_type', 'video') == 'video'
@ -1664,24 +1690,7 @@ class YoutubeDL(object):
if 'display_id' not in info_dict and 'id' in info_dict:
info_dict['display_id'] = info_dict['id']
for ts_key, date_key in (
('timestamp', 'upload_date'),
('release_timestamp', 'release_date'),
):
if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None:
# Working around out-of-range timestamp values (e.g. negative ones on Windows,
# see http://bugs.python.org/issue1646728)
try:
upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key])
info_dict[date_key] = compat_str(upload_date.strftime('%Y%m%d'))
except (ValueError, OverflowError, OSError):
pass
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
if info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
self._fill_common_fields(info_dict)
for cc_kind in ('subtitles', 'automatic_captions'):
cc = info_dict.get(cc_kind)

View File

@ -2719,8 +2719,222 @@ if sys.version_info < (2, 7):
if isinstance(xpath, compat_str):
xpath = xpath.encode('ascii')
return xpath
# further code below based on CPython 2.7 source
import functools
_xpath_tokenizer_re = re.compile(r'''(?x)
( # (1)
'[^']*'|"[^"]*"| # quoted strings, or
::|//?|\.\.|\(\)|[/.*:[\]()@=] # navigation specials
)| # or (2)
((?:\{[^}]+\})?[^/[\]()@=\s]+)| # token: optional {ns}, no specials
\s+ # or white space
''')
def _xpath_tokenizer(pattern, namespaces=None):
for token in _xpath_tokenizer_re.findall(pattern):
tag = token[1]
if tag and tag[0] != "{" and ":" in tag:
try:
if not namespaces:
raise KeyError
prefix, uri = tag.split(":", 1)
yield token[0], "{%s}%s" % (namespaces[prefix], uri)
except KeyError:
raise SyntaxError("prefix %r not found in prefix map" % prefix)
else:
yield token
def _get_parent_map(context):
parent_map = context.parent_map
if parent_map is None:
context.parent_map = parent_map = {}
for p in context.root.getiterator():
for e in p:
parent_map[e] = p
return parent_map
def _select(context, result, filter_fn=lambda *_: True):
for elem in result:
for e in elem:
if filter_fn(e, elem):
yield e
def _prepare_child(next_, token):
tag = token[1]
return functools.partial(_select, filter_fn=lambda e, _: e.tag == tag)
def _prepare_star(next_, token):
return _select
def _prepare_self(next_, token):
return lambda _, result: (e for e in result)
def _prepare_descendant(next_, token):
token = next(next_)
if token[0] == "*":
tag = "*"
elif not token[0]:
tag = token[1]
else:
raise SyntaxError("invalid descendant")
def select(context, result):
for elem in result:
for e in elem.getiterator(tag):
if e is not elem:
yield e
return select
def _prepare_parent(next_, token):
def select(context, result):
# FIXME: raise error if .. is applied at toplevel?
parent_map = _get_parent_map(context)
result_map = {}
for elem in result:
if elem in parent_map:
parent = parent_map[elem]
if parent not in result_map:
result_map[parent] = None
yield parent
return select
def _prepare_predicate(next_, token):
signature = []
predicate = []
for token in next_:
if token[0] == "]":
break
if token[0] and token[0][:1] in "'\"":
token = "'", token[0][1:-1]
signature.append(token[0] or "-")
predicate.append(token[1])
def select(context, result, filter_fn=lambda _: True):
for elem in result:
if filter_fn(elem):
yield elem
signature = "".join(signature)
# use signature to determine predicate type
if signature == "@-":
# [@attribute] predicate
key = predicate[1]
return functools.partial(
select, filter_fn=lambda el: el.get(key) is not None)
if signature == "@-='":
# [@attribute='value']
key = predicate[1]
value = predicate[-1]
return functools.partial(
select, filter_fn=lambda el: el.get(key) == value)
if signature == "-" and not re.match(r"\d+$", predicate[0]):
# [tag]
tag = predicate[0]
return functools.partial(
select, filter_fn=lambda el: el.find(tag) is not None)
if signature == "-='" and not re.match(r"\d+$", predicate[0]):
# [tag='value']
tag = predicate[0]
value = predicate[-1]
def itertext(el):
for e in el.getiterator():
e = e.text
if e:
yield e
def select(context, result):
for elem in result:
for e in elem.findall(tag):
if "".join(itertext(e)) == value:
yield elem
break
return select
if signature == "-" or signature == "-()" or signature == "-()-":
# [index] or [last()] or [last()-index]
if signature == "-":
index = int(predicate[0]) - 1
else:
if predicate[0] != "last":
raise SyntaxError("unsupported function")
if signature == "-()-":
try:
index = int(predicate[2]) - 1
except ValueError:
raise SyntaxError("unsupported expression")
else:
index = -1
def select(context, result):
parent_map = _get_parent_map(context)
for elem in result:
try:
parent = parent_map[elem]
# FIXME: what if the selector is "*" ?
elems = list(parent.findall(elem.tag))
if elems[index] is elem:
yield elem
except (IndexError, KeyError):
pass
return select
raise SyntaxError("invalid predicate")
ops = {
"": _prepare_child,
"*": _prepare_star,
".": _prepare_self,
"..": _prepare_parent,
"//": _prepare_descendant,
"[": _prepare_predicate,
}
_cache = {}
class _SelectorContext:
parent_map = None
def __init__(self, root):
self.root = root
##
# Generate all matching objects.
def compat_etree_iterfind(elem, path, namespaces=None):
# compile selector pattern
if path[-1:] == "/":
path = path + "*" # implicit all (FIXME: keep this?)
try:
selector = _cache[path]
except KeyError:
if len(_cache) > 100:
_cache.clear()
if path[:1] == "/":
raise SyntaxError("cannot use absolute path on element")
tokens = _xpath_tokenizer(path, namespaces)
selector = []
for token in tokens:
if token[0] == "/":
continue
try:
selector.append(ops[token[0]](tokens, token))
except StopIteration:
raise SyntaxError("invalid path")
_cache[path] = selector
# execute selector pattern
result = [elem]
context = _SelectorContext(elem)
for select in selector:
result = select(context, result)
return result
# end of code based on CPython 2.7 source
else:
compat_xpath = lambda xpath: xpath
compat_etree_iterfind = lambda element, match: element.iterfind(match)
compat_os_name = os._name if os.name == 'java' else os.name
@ -2756,7 +2970,7 @@ except (AssertionError, UnicodeEncodeError):
def compat_ord(c):
if type(c) is int:
if isinstance(c, int):
return c
else:
return ord(c)
@ -2955,7 +3169,7 @@ except ImportError:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return exc_val is not None and isinstance(exc_val, self._exceptions or tuple())
return exc_type is not None and issubclass(exc_type, self._exceptions or tuple())
# subprocess.Popen context manager
@ -3308,6 +3522,7 @@ __all__ = [
'compat_contextlib_suppress',
'compat_ctypes_WINFUNCTYPE',
'compat_etree_fromstring',
'compat_etree_iterfind',
'compat_filter',
'compat_get_terminal_size',
'compat_getenv',

View File

@ -1169,10 +1169,10 @@ class InfoExtractor(object):
def _get_netrc_login_info(self, netrc_machine=None):
username = None
password = None
netrc_machine = netrc_machine or self._NETRC_MACHINE
if self._downloader.params.get('usenetrc', False):
try:
netrc_machine = netrc_machine or self._NETRC_MACHINE
info = netrc.netrc().authenticators(netrc_machine)
if info is not None:
username = info[0]
@ -1180,7 +1180,7 @@ class InfoExtractor(object):
else:
raise netrc.NetrcParseError(
'No authenticators for %s' % netrc_machine)
except (IOError, netrc.NetrcParseError) as err:
except (AttributeError, IOError, netrc.NetrcParseError) as err:
self._downloader.report_warning(
'parsing .netrc: %s' % error_to_compat_str(err))
@ -1490,14 +1490,18 @@ class InfoExtractor(object):
return dict((k, v) for k, v in info.items() if v is not None)
def _search_nextjs_data(self, webpage, video_id, **kw):
nkw = dict((k, v) for k, v in kw.items() if k in ('transform_source', 'fatal'))
kw.pop('transform_source', None)
next_data = self._search_regex(
r'''<script[^>]+\bid\s*=\s*('|")__NEXT_DATA__\1[^>]*>(?P<nd>[^<]+)</script>''',
webpage, 'next.js data', group='nd', **kw)
if not next_data:
return {}
return self._parse_json(next_data, video_id, **nkw)
# ..., *, transform_source=None, fatal=True, default=NO_DEFAULT
# TODO: remove this backward compat
default = kw.get('default', NO_DEFAULT)
if default == '{}':
kw['default'] = {}
kw = compat_kwargs(kw)
return self._search_json(
r'''<script\s[^>]*?\bid\s*=\s*('|")__NEXT_DATA__\1[^>]*>''',
webpage, 'next.js data', video_id, end_pattern='</script>',
**kw)
def _search_nuxt_data(self, webpage, video_id, *args, **kwargs):
"""Parses Nuxt.js metadata. This works as long as the function __NUXT__ invokes is a pure function"""
@ -3296,12 +3300,16 @@ class InfoExtractor(object):
return ret
@classmethod
def _merge_subtitles(cls, subtitle_dict1, subtitle_dict2):
""" Merge two subtitle dictionaries, language by language. """
ret = dict(subtitle_dict1)
for lang in subtitle_dict2:
ret[lang] = cls._merge_subtitle_items(subtitle_dict1.get(lang, []), subtitle_dict2[lang])
return ret
def _merge_subtitles(cls, subtitle_dict1, *subtitle_dicts, **kwargs):
""" Merge subtitle dictionaries, language by language. """
# ..., * , target=None
target = kwargs.get('target') or dict(subtitle_dict1)
for subtitle_dict in subtitle_dicts:
for lang in subtitle_dict:
target[lang] = cls._merge_subtitle_items(target.get(lang, []), subtitle_dict[lang])
return target
def extract_automatic_captions(self, *args, **kwargs):
if (self._downloader.params.get('writeautomaticsub', False)
@ -3334,6 +3342,29 @@ class InfoExtractor(object):
def _generic_title(self, url):
return compat_urllib_parse_unquote(os.path.splitext(url_basename(url))[0])
def _yes_playlist(self, playlist_id, video_id, *args, **kwargs):
# smuggled_data=None, *, playlist_label='playlist', video_label='video'
smuggled_data = args[0] if len(args) == 1 else kwargs.get('smuggled_data')
playlist_label = kwargs.get('playlist_label', 'playlist')
video_label = kwargs.get('video_label', 'video')
if not playlist_id or not video_id:
return not video_id
no_playlist = (smuggled_data or {}).get('force_noplaylist')
if no_playlist is not None:
return not no_playlist
video_id = '' if video_id is True else ' ' + video_id
noplaylist = self.get_param('noplaylist')
self.to_screen(
'Downloading just the {0}{1} because of --no-playlist'.format(video_label, video_id)
if noplaylist else
'Downloading {0}{1} - add --no-playlist to download just the {2}{3}'.format(
playlist_label, '' if playlist_id is True else ' ' + playlist_id,
video_label, video_id))
return not noplaylist
class SearchInfoExtractor(InfoExtractor):
"""

View File

@ -898,21 +898,13 @@ from .ooyala import (
)
from .ora import OraTVIE
from .orf import (
ORFTVthekIE,
ORFFM4IE,
ORFONIE,
ORFONLiveIE,
ORFFM4StoryIE,
ORFOE1IE,
ORFOE3IE,
ORFNOEIE,
ORFWIEIE,
ORFBGLIE,
ORFOOEIE,
ORFSTMIE,
ORFKTNIE,
ORFSBGIE,
ORFTIRIE,
ORFVBGIE,
ORFIPTVIE,
ORFPodcastIE,
ORFRadioIE,
ORFRadioCollectionIE,
)
from .outsidetv import OutsideTVIE
from .packtpub import (
@ -1653,7 +1645,15 @@ from .younow import (
YouNowChannelIE,
YouNowMomentIE,
)
from .youporn import YouPornIE
from .youporn import (
YouPornIE,
YouPornCategoryIE,
YouPornChannelIE,
YouPornCollectionIE,
YouPornStarIE,
YouPornTagIE,
YouPornVideosIE,
)
from .yourporn import YourPornIE
from .yourupload import YourUploadIE
from .youtube import (

View File

@ -1,3 +1,4 @@
# coding: utf-8
from __future__ import unicode_literals
import itertools
@ -10,7 +11,7 @@ from ..compat import (
compat_ord,
compat_str,
compat_urllib_parse_unquote,
compat_zip
compat_zip as zip,
)
from ..utils import (
int_or_none,
@ -24,7 +25,7 @@ class MixcloudBaseIE(InfoExtractor):
def _call_api(self, object_type, object_fields, display_id, username, slug=None):
lookup_key = object_type + 'Lookup'
return self._download_json(
'https://www.mixcloud.com/graphql', display_id, query={
'https://app.mixcloud.com/graphql', display_id, query={
'query': '''{
%s(lookup: {username: "%s"%s}) {
%s
@ -44,7 +45,7 @@ class MixcloudIE(MixcloudBaseIE):
'ext': 'm4a',
'title': 'Cryptkeeper',
'description': 'After quite a long silence from myself, finally another Drum\'n\'Bass mix with my favourite current dance floor bangers.',
'uploader': 'Daniel Holbach',
'uploader': 'dholbach', # was: 'Daniel Holbach',
'uploader_id': 'dholbach',
'thumbnail': r're:https?://.*\.jpg',
'view_count': int,
@ -57,7 +58,7 @@ class MixcloudIE(MixcloudBaseIE):
'id': 'gillespeterson_caribou-7-inch-vinyl-mix-chat',
'ext': 'mp3',
'title': 'Caribou 7 inch Vinyl Mix & Chat',
'description': 'md5:2b8aec6adce69f9d41724647c65875e8',
'description': r're:Last week Dan Snaith aka Caribou swung by the Brownswood.{136}',
'uploader': 'Gilles Peterson Worldwide',
'uploader_id': 'gillespeterson',
'thumbnail': 're:https?://.*',
@ -65,6 +66,23 @@ class MixcloudIE(MixcloudBaseIE):
'timestamp': 1422987057,
'upload_date': '20150203',
},
'params': {
'skip_download': '404 not found',
},
}, {
'url': 'https://www.mixcloud.com/gillespeterson/carnival-m%C3%BAsica-popular-brasileira-mix/',
'info_dict': {
'id': 'gillespeterson_carnival-música-popular-brasileira-mix',
'ext': 'm4a',
'title': 'Carnival Música Popular Brasileira Mix',
'description': r're:Gilles was recently in Brazil to play at Boiler Room.{208}',
'timestamp': 1454347174,
'upload_date': '20160201',
'uploader': 'Gilles Peterson Worldwide',
'uploader_id': 'gillespeterson',
'thumbnail': 're:https?://.*',
'view_count': int,
},
}, {
'url': 'https://beta.mixcloud.com/RedLightRadio/nosedrip-15-red-light-radio-01-18-2016/',
'only_matching': True,
@ -76,10 +94,10 @@ class MixcloudIE(MixcloudBaseIE):
"""Encrypt/Decrypt XOR cipher. Both ways are possible because it's XOR."""
return ''.join([
compat_chr(compat_ord(ch) ^ compat_ord(k))
for ch, k in compat_zip(ciphertext, itertools.cycle(key))])
for ch, k in zip(ciphertext, itertools.cycle(key))])
def _real_extract(self, url):
username, slug = re.match(self._VALID_URL, url).groups()
username, slug = self._match_valid_url(url).groups()
username, slug = compat_urllib_parse_unquote(username), compat_urllib_parse_unquote(slug)
track_id = '%s_%s' % (username, slug)

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ from __future__ import unicode_literals
import re
from .common import InfoExtractor
from ..utils import (
float_or_none,
get_element_by_id,
@ -11,6 +12,7 @@ from ..utils import (
strip_or_none,
unified_strdate,
urljoin,
str_to_int,
)
@ -35,6 +37,26 @@ class VidLiiIE(InfoExtractor):
'categories': ['News & Politics'],
'tags': ['Vidlii', 'Jan', 'Videogames'],
}
}, {
# HD
'url': 'https://www.vidlii.com/watch?v=2Ng8Abj2Fkl',
'md5': '450e7da379c884788c3a4fa02a3ce1a4',
'info_dict': {
'id': '2Ng8Abj2Fkl',
'ext': 'mp4',
'title': 'test',
'description': 'md5:cc55a86032a7b6b3cbfd0f6b155b52e9',
'thumbnail': 'https://www.vidlii.com/usfi/thmp/2Ng8Abj2Fkl.jpg',
'uploader': 'VidLii',
'uploader_url': 'https://www.vidlii.com/user/VidLii',
'upload_date': '20200927',
'duration': 5,
'view_count': int,
'comment_count': int,
'average_rating': float,
'categories': ['Film & Animation'],
'tags': list,
},
}, {
'url': 'https://www.vidlii.com/embed?v=tJluaH4BJ3v&a=0',
'only_matching': True,
@ -46,11 +68,32 @@ class VidLiiIE(InfoExtractor):
webpage = self._download_webpage(
'https://www.vidlii.com/watch?v=%s' % video_id, video_id)
video_url = self._search_regex(
r'src\s*:\s*(["\'])(?P<url>(?:https?://)?(?:(?!\1).)+)\1', webpage,
'video url', group='url')
formats = []
title = self._search_regex(
def add_format(format_url, height=None):
height = int(self._search_regex(r'(\d+)\.mp4',
format_url, 'height', default=360))
formats.append({
'url': format_url,
'format_id': '%dp' % height if height else None,
'height': height,
})
sources = re.findall(
r'src\s*:\s*(["\'])(?P<url>(?:https?://)?(?:(?!\1).)+)\1',
webpage)
formats = []
if len(sources) > 1:
add_format(sources[1][1])
self._check_formats(formats, video_id)
if len(sources) > 0:
add_format(sources[0][1])
self._sort_formats(formats)
title = self._html_search_regex(
(r'<h1>([^<]+)</h1>', r'<title>([^<]+) - VidLii<'), webpage,
'title')
@ -82,9 +125,9 @@ class VidLiiIE(InfoExtractor):
default=None) or self._search_regex(
r'duration\s*:\s*(\d+)', webpage, 'duration', fatal=False))
view_count = int_or_none(self._search_regex(
(r'<strong>(\d+)</strong> views',
r'Views\s*:\s*<strong>(\d+)</strong>'),
view_count = str_to_int(self._html_search_regex(
(r'<strong>([\d,.]+)</strong> views',
r'Views\s*:\s*<strong>([\d,.]+)</strong>'),
webpage, 'view count', fatal=False))
comment_count = int_or_none(self._search_regex(
@ -109,7 +152,7 @@ class VidLiiIE(InfoExtractor):
return {
'id': video_id,
'url': video_url,
'formats': formats,
'title': title,
'description': description,
'thumbnail': thumbnail,

View File

@ -1,20 +1,38 @@
# coding: utf-8
from __future__ import unicode_literals
import itertools
import re
from time import sleep
from .common import InfoExtractor
from ..utils import (
clean_html,
extract_attributes,
ExtractorError,
get_element_by_class,
get_element_by_id,
int_or_none,
str_to_int,
merge_dicts,
parse_count,
parse_qs,
T,
traverse_obj,
unified_strdate,
url_or_none,
urljoin,
)
class YouPornIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?youporn\.com/(?:watch|embed)/(?P<id>\d+)(?:/(?P<display_id>[^/?#&]+))?'
_VALID_URL = (
r'youporn:(?P<id>\d+)',
r'''(?x)
https?://(?:www\.)?youporn\.com/(?:watch|embed)/(?P<id>\d+)
(?:/(?:(?P<display_id>[^/?#&]+)/?)?)?(?:[#?]|$)
'''
)
_EMBED_REGEX = [r'<iframe[^>]+\bsrc=["\'](?P<url>(?:https?:)?//(?:www\.)?youporn\.com/embed/\d+)']
_TESTS = [{
'url': 'http://www.youporn.com/watch/505835/sex-ed-is-it-safe-to-masturbate-daily/',
'md5': '3744d24c50438cf5b6f6d59feb5055c2',
@ -34,7 +52,7 @@ class YouPornIE(InfoExtractor):
'tags': list,
'age_limit': 18,
},
'skip': 'This video has been disabled',
'skip': 'This video has been deactivated',
}, {
# Unknown uploader
'url': 'http://www.youporn.com/watch/561726/big-tits-awesome-brunette-on-amazing-webcam-show/?from=related3&al=2&from_id=561726&pos=4',
@ -66,57 +84,104 @@ class YouPornIE(InfoExtractor):
}, {
'url': 'https://www.youporn.com/watch/13922959/femdom-principal/',
'only_matching': True,
}, {
'url': 'https://www.youporn.com/watch/16290308/tinderspecial-trailer1/',
'info_dict': {
'id': '16290308',
'age_limit': 18,
'categories': [],
'description': None, # SEO spam using title removed
'display_id': 'tinderspecial-trailer1',
'duration': 298.0,
'ext': 'mp4',
'upload_date': '20201123',
'uploader': 'Ersties',
'tags': [],
'thumbnail': 'https://fi1.ypncdn.com/m=eaSaaTbWx/202011/23/16290308/original/3.jpg',
'timestamp': 1606147564,
'title': 'Tinder In Real Life',
'view_count': int,
}
}]
@staticmethod
def _extract_urls(webpage):
return re.findall(
r'<iframe[^>]+\bsrc=["\']((?:https?:)?//(?:www\.)?youporn\.com/embed/\d+)',
webpage)
@classmethod
def _extract_urls(cls, webpage):
def yield_urls():
for p in cls._EMBED_REGEX:
for m in re.finditer(p, webpage):
yield m.group('url')
return list(yield_urls())
def _real_extract(self, url):
mobj = re.match(self._VALID_URL, url)
video_id = mobj.group('id')
display_id = mobj.group('display_id') or video_id
# A different video ID (data-video-id) is hidden in the page but
# never seems to be used
video_id, display_id = self._match_valid_url(url).group('id', 'display_id')
url = 'http://www.youporn.com/watch/%s' % (video_id,)
webpage = self._download_webpage(
url, video_id, headers={'Cookie': 'age_verified=1'})
definitions = self._download_json(
'https://www.youporn.com/api/video/media_definitions/%s/' % video_id,
display_id)
watchable = self._search_regex(
r'''(<div\s[^>]*\bid\s*=\s*('|")?watch-container(?(2)\2|(?!-)\b)[^>]*>)''',
webpage, 'watchability', default=None)
if not watchable:
msg = re.split(r'\s{4}', clean_html(get_element_by_id(
'mainContent', webpage)) or '')[0]
raise ExtractorError(
('%s says: %s' % (self.IE_NAME, msg))
if msg else 'Video unavailable: no reason found',
expected=True)
# internal ID ?
# video_id = extract_attributes(watchable).get('data-video-id')
playervars = self._search_json(
r'\bplayervars\s*:', webpage, 'playervars', video_id)
def get_fmt(x):
v_url = url_or_none(x.get('videoUrl'))
if v_url:
x['videoUrl'] = v_url
return (x['format'], x)
defs_by_format = dict(traverse_obj(playervars, (
'mediaDefinitions', lambda _, v: v.get('format'), T(get_fmt))))
def get_format_data(f):
if f not in defs_by_format:
return []
return self._download_json(
defs_by_format[f]['videoUrl'], video_id, '{0}-formats'.format(f))
formats = []
for definition in definitions:
if not isinstance(definition, dict):
continue
video_url = url_or_none(definition.get('videoUrl'))
if not video_url:
continue
f = {
'url': video_url,
'filesize': int_or_none(definition.get('videoSize')),
}
height = int_or_none(definition.get('quality'))
# Try to extract only the actual master m3u8 first, avoiding the duplicate single resolution "master" m3u8s
for hls_url in traverse_obj(
get_format_data('hls'),
(lambda _, v: not isinstance(v['defaultQuality'], bool), 'videoUrl'),
(Ellipsis, 'videoUrl')):
formats.extend(self._extract_m3u8_formats(
hls_url, video_id, 'mp4', fatal=False, m3u8_id='hls',
entry_protocol='m3u8_native'))
for f in traverse_obj(get_format_data('mp4'), (
lambda _, v: v.get('videoUrl'), {
'url': ('videoUrl', T(url_or_none)),
'filesize': ('videoSize', T(int_or_none)),
'height': ('quality', T(int_or_none)),
}, T(lambda x: x.get('videoUrl') and x))):
# Video URL's path looks like this:
# /201012/17/505835/720p_1500k_505835/YouPorn%20-%20Sex%20Ed%20Is%20It%20Safe%20To%20Masturbate%20Daily.mp4
# /201012/17/505835/vl_240p_240k_505835/YouPorn%20-%20Sex%20Ed%20Is%20It%20Safe%20To%20Masturbate%20Daily.mp4
# /videos/201703/11/109285532/1080P_4000K_109285532.mp4
# We will benefit from it by extracting some metadata
mobj = re.search(r'(?P<height>\d{3,4})[pP]_(?P<bitrate>\d+)[kK]_\d+', video_url)
mobj = re.search(r'(?P<height>\d{3,4})[pP]_(?P<bitrate>\d+)[kK]_\d+', f['videoUrl'])
if mobj:
if not height:
height = int(mobj.group('height'))
bitrate = int(mobj.group('bitrate'))
f.update({
'format_id': '%dp-%dk' % (height, bitrate),
'tbr': bitrate,
})
f['height'] = height
if not f.get('height'):
f['height'] = int(mobj.group('height'))
f['tbr'] = int(mobj.group('bitrate'))
f['format_id'] = '%dp-%dk' % (f['height'], f['tbr'])
formats.append(f)
self._sort_formats(formats)
webpage = self._download_webpage(
'http://www.youporn.com/watch/%s' % video_id, display_id,
headers={'Cookie': 'age_verified=1'})
title = self._html_search_regex(
r'(?s)<div[^>]+class=["\']watchVideoTitle[^>]+>(.+?)</div>',
webpage, 'title', default=None) or self._og_search_title(
@ -131,8 +196,10 @@ class YouPornIE(InfoExtractor):
thumbnail = self._search_regex(
r'(?:imageurl\s*=|poster\s*:)\s*(["\'])(?P<thumbnail>.+?)\1',
webpage, 'thumbnail', fatal=False, group='thumbnail')
duration = int_or_none(self._html_search_meta(
'video:duration', webpage, 'duration', fatal=False))
duration = traverse_obj(playervars, ('duration', T(int_or_none)))
if duration is None:
duration = int_or_none(self._html_search_meta(
'video:duration', webpage, 'duration', fatal=False))
uploader = self._html_search_regex(
r'(?s)<div[^>]+class=["\']submitByLink["\'][^>]*>(.+?)</div>',
@ -148,11 +215,11 @@ class YouPornIE(InfoExtractor):
view_count = None
views = self._search_regex(
r'(<div[^>]+\bclass=["\']js_videoInfoViews["\']>)', webpage,
'views', default=None)
r'(<div\s[^>]*\bdata-value\s*=[^>]+>)\s*<label>Views:</label>',
webpage, 'views', default=None)
if views:
view_count = str_to_int(extract_attributes(views).get('data-value'))
comment_count = str_to_int(self._search_regex(
view_count = parse_count(extract_attributes(views).get('data-value'))
comment_count = parse_count(self._search_regex(
r'>All [Cc]omments? \(([\d,.]+)\)',
webpage, 'comment count', default=None))
@ -168,7 +235,10 @@ class YouPornIE(InfoExtractor):
r'(?s)Tags:.*?</div>\s*<div[^>]+class=["\']tagBoxContent["\'][^>]*>(.+?)</div>',
'tags')
return {
data = self._search_json_ld(webpage, video_id, expected_type='VideoObject', fatal=False) or {}
data.pop('url', None)
result = merge_dicts(data, {
'id': video_id,
'display_id': display_id,
'title': title,
@ -183,4 +253,442 @@ class YouPornIE(InfoExtractor):
'tags': tags,
'age_limit': age_limit,
'formats': formats,
}
})
# Remove promotional non-description
if result.get('description', '').startswith(
'Watch %s online' % (result['title'],)):
del result['description']
return result
class YouPornListBase(InfoExtractor):
# pattern in '.title-text' element of page section containing videos
_PLAYLIST_TITLEBAR_RE = r'\s+[Vv]ideos\s*$'
_PAGE_RETRY_COUNT = 0 # ie, no retry
_PAGE_RETRY_DELAY = 2 # seconds
def _get_next_url(self, url, pl_id, html):
return urljoin(url, self._search_regex(
r'''<a\s[^>]*?\bhref\s*=\s*("|')(?P<url>(?:(?!\1)[^>])+)\1''',
get_element_by_id('next', html) or '', 'next page',
group='url', default=None))
@classmethod
def _get_title_from_slug(cls, title_slug):
return re.sub(r'[_-]', ' ', title_slug)
def _entries(self, url, pl_id, html=None, page_num=None):
# separates page sections
PLAYLIST_SECTION_RE = (
r'''<div\s[^>]*\bclass\s*=\s*('|")(?:[\w$-]+\s+|\s)*?title-bar(?:\s+[\w$-]+|\s)*\1[^>]*>'''
)
# contains video link
VIDEO_URL_RE = r'''(?x)
<div\s[^>]*\bdata-video-id\s*=\s*('|")\d+\1[^>]*>\s*
(?:<div\b[\s\S]+?</div>\s*)*
<a\s[^>]*\bhref\s*=\s*('|")(?P<url>(?:(?!\2)[^>])+)\2
'''
def yield_pages(url, html=html, page_num=page_num):
fatal = not html
for pnum in itertools.count(start=page_num or 1):
if not html:
html = self._download_webpage(
url, pl_id, note='Downloading page %d' % pnum,
fatal=fatal)
if not html:
break
fatal = False
yield (url, html, pnum)
# explicit page: extract just that page
if page_num is not None:
break
next_url = self._get_next_url(url, pl_id, html)
if not next_url or next_url == url:
break
url, html = next_url, None
def retry_page(msg, tries_left, page_data):
if tries_left <= 0:
return
self.report_warning(msg, pl_id)
sleep(self._PAGE_RETRY_DELAY)
return next(
yield_pages(page_data[0], page_num=page_data[2]), None)
def yield_entries(html):
for frag in re.split(PLAYLIST_SECTION_RE, html):
if not frag:
continue
t_text = get_element_by_class('title-text', frag or '')
if not (t_text and re.search(self._PLAYLIST_TITLEBAR_RE, t_text)):
continue
for m in re.finditer(VIDEO_URL_RE, frag):
video_url = urljoin(url, m.group('url'))
if video_url:
yield self.url_result(video_url)
last_first_url = None
for page_data in yield_pages(url, html=html, page_num=page_num):
# page_data: url, html, page_num
first_url = None
tries_left = self._PAGE_RETRY_COUNT + 1
while tries_left > 0:
tries_left -= 1
for from_ in yield_entries(page_data[1]):
# may get the same page twice instead of empty page
# or (site bug) intead of actual next page
if not first_url:
first_url = from_['url']
if first_url == last_first_url:
# sometimes (/porntags/) the site serves the previous page
# instead but may provide the correct page after a delay
page_data = retry_page(
'Retrying duplicate page...', tries_left, page_data)
if page_data:
first_url = None
break
continue
yield from_
else:
if not first_url and 'no-result-paragarph1' in page_data[1]:
page_data = retry_page(
'Retrying empty page...', tries_left, page_data)
if page_data:
continue
else:
# success/failure
break
# may get an infinite (?) sequence of empty pages
if not first_url:
break
last_first_url = first_url
def _real_extract(self, url, html=None):
# exceptionally, id may be None
m_dict = self._match_valid_url(url).groupdict()
pl_id, page_type, sort = (m_dict.get(k) for k in ('id', 'type', 'sort'))
qs = parse_qs(url)
for q, v in qs.items():
if v:
qs[q] = v[-1]
else:
del qs[q]
base_id = pl_id or 'YouPorn'
title = self._get_title_from_slug(base_id)
if page_type:
title = '%s %s' % (page_type.capitalize(), title)
base_id = [base_id.lower()]
if sort is None:
title += ' videos'
else:
title = '%s videos by %s' % (title, re.sub(r'[_-]', ' ', sort))
base_id.append(sort)
if qs:
ps = ['%s=%s' % item for item in sorted(qs.items())]
title += ' (%s)' % ','.join(ps)
base_id.extend(ps)
pl_id = '/'.join(base_id)
return self.playlist_result(
self._entries(url, pl_id, html=html,
page_num=int_or_none(qs.get('page'))),
playlist_id=pl_id, playlist_title=title)
class YouPornCategoryIE(YouPornListBase):
IE_DESC = 'YouPorn category, with sorting, filtering and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
(?P<type>category)/(?P<id>[^/?#&]+)
(?:/(?P<sort>popular|views|rating|time|duration))?/?(?:[#?]|$)
'''
_TESTS = [{
'note': 'Full list with pagination',
'url': 'https://www.youporn.com/category/lingerie/popular/',
'info_dict': {
'id': 'lingerie/popular',
'title': 'Category lingerie videos by popular',
},
'playlist_mincount': 39,
}, {
'note': 'Filtered paginated list with single page result',
'url': 'https://www.youporn.com/category/lingerie/duration/?min_minutes=10',
'info_dict': {
'id': 'lingerie/duration/min_minutes=10',
'title': 'Category lingerie videos by duration (min_minutes=10)',
},
'playlist_maxcount': 30,
}, {
'note': 'Single page of full list',
'url': 'https://www.youporn.com/category/lingerie/popular?page=1',
'info_dict': {
'id': 'lingerie/popular/page=1',
'title': 'Category lingerie videos by popular (page=1)',
},
'playlist_count': 30,
}]
class YouPornChannelIE(YouPornListBase):
IE_DESC = 'YouPorn channel, with sorting and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
(?P<type>channel)/(?P<id>[^/?#&]+)
(?:/(?P<sort>rating|views|duration))?/?(?:[#?]|$)
'''
_TESTS = [{
'note': 'Full list with pagination',
'url': 'https://www.youporn.com/channel/x-feeds/',
'info_dict': {
'id': 'x-feeds',
'title': 'Channel X-Feeds videos',
},
'playlist_mincount': 37,
}, {
'note': 'Single page of full list (no filters here)',
'url': 'https://www.youporn.com/channel/x-feeds/duration?page=1',
'info_dict': {
'id': 'x-feeds/duration/page=1',
'title': 'Channel X-Feeds videos by duration (page=1)',
},
'playlist_count': 24,
}]
@staticmethod
def _get_title_from_slug(title_slug):
return re.sub(r'_', ' ', title_slug).title()
class YouPornCollectionIE(YouPornListBase):
IE_DESC = 'YouPorn collection (user playlist), with sorting and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
(?P<type>collection)s/videos/(?P<id>\d+)
(?:/(?P<sort>rating|views|time|duration))?/?(?:[#?]|$)
'''
_PLAYLIST_TITLEBAR_RE = r'^\s*Videos\s+in\s'
_TESTS = [{
'note': 'Full list with pagination',
'url': 'https://www.youporn.com/collections/videos/33044251/',
'info_dict': {
'id': '33044251',
'title': 'Collection Sexy Lips videos',
'uploader': 'ph-littlewillyb',
},
'playlist_mincount': 50,
}, {
'note': 'Single page of full list (no filters here)',
'url': 'https://www.youporn.com/collections/videos/33044251/time?page=1',
'info_dict': {
'id': '33044251/time/page=1',
'title': 'Collection Sexy Lips videos by time (page=1)',
'uploader': 'ph-littlewillyb',
},
'playlist_count': 20,
}]
def _real_extract(self, url):
pl_id = self._match_id(url)
html = self._download_webpage(url, pl_id)
playlist = super(YouPornCollectionIE, self)._real_extract(url, html=html)
infos = re.sub(r'\s+', ' ', clean_html(get_element_by_class(
'collection-infos', html)) or '')
title, uploader = self._search_regex(
r'^\s*Collection: (?P<title>.+?) \d+ VIDEOS \d+ VIEWS \d+ days LAST UPDATED From: (?P<uploader>[\w_-]+)',
infos, 'title/uploader', group=('title', 'uploader'), default=(None, None))
return merge_dicts({
'title': playlist['title'].replace(playlist['id'].split('/')[0], title),
'uploader': uploader,
}, playlist) if title else playlist
class YouPornTagIE(YouPornListBase):
IE_DESC = 'YouPorn tag (porntags), with sorting, filtering and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
porn(?P<type>tag)s/(?P<id>[^/?#&]+)
(?:/(?P<sort>views|rating|time|duration))?/?(?:[#?]|$)
'''
_PLAYLIST_TITLEBAR_RE = r'^\s*Videos\s+tagged\s'
_PAGE_RETRY_COUNT = 1
_TESTS = [{
'note': 'Full list with pagination',
'url': 'https://www.youporn.com/porntags/austrian',
'info_dict': {
'id': 'austrian',
'title': 'Tag austrian videos',
},
'playlist_mincount': 35,
'expected_warnings': ['Retrying duplicate page'],
}, {
'note': 'Filtered paginated list with single page result',
'url': 'https://www.youporn.com/porntags/austrian/duration/?min_minutes=10',
'info_dict': {
'id': 'austrian/duration/min_minutes=10',
'title': 'Tag austrian videos by duration (min_minutes=10)',
},
# number of videos per page is (row x col) 2x3 + 6x4 + 2, or + 3,
# or more, varying with number of ads; let's set max as 9x4
# NB col 1 may not be shown in non-JS page with site CSS and zoom 100%
'playlist_maxcount': 32,
'expected_warnings': ['Retrying duplicate page', 'Retrying empty page'],
}, {
'note': 'Single page of full list',
'url': 'https://www.youporn.com/porntags/austrian/?page=1',
'info_dict': {
'id': 'austrian/page=1',
'title': 'Tag austrian videos (page=1)',
},
'playlist_mincount': 32,
'playlist_maxcount': 34,
'expected_warnings': ['Retrying duplicate page', 'Retrying empty page'],
}]
# YP tag navigation is broken, loses sort
def _get_next_url(self, url, pl_id, html):
next_url = super(YouPornTagIE, self)._get_next_url(url, pl_id, html)
if next_url:
n = self._match_valid_url(next_url)
if n:
s = n.groupdict().get('sort')
if s:
u = self._match_valid_url(url)
if u:
u = u.groupdict().get('sort')
if s and not u:
n = n.end('sort')
next_url = next_url[:n] + '/' + u + next_url[n:]
return next_url
class YouPornStarIE(YouPornListBase):
IE_DESC = 'YouPorn Pornstar, with description, sorting and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
(?P<type>pornstar)/(?P<id>[^/?#&]+)
(?:/(?P<sort>rating|views|duration))?/?(?:[#?]|$)
'''
_PLAYLIST_TITLEBAR_RE = r'^\s*Videos\s+[fF]eaturing\s'
_TESTS = [{
'note': 'Full list with pagination',
'url': 'https://www.youporn.com/pornstar/daynia/',
'info_dict': {
'id': 'daynia',
'title': 'Pornstar Daynia videos',
'description': r're:Daynia Rank \d+ Videos \d+ Views [\d,.]+ .+ Subscribers \d+',
},
'playlist_mincount': 45,
}, {
'note': 'Single page of full list (no filters here)',
'url': 'https://www.youporn.com/pornstar/daynia/?page=1',
'info_dict': {
'id': 'daynia/page=1',
'title': 'Pornstar Daynia videos (page=1)',
'description': 're:.{180,}',
},
'playlist_count': 26,
}]
@staticmethod
def _get_title_from_slug(title_slug):
return re.sub(r'_', ' ', title_slug).title()
def _real_extract(self, url):
pl_id = self._match_id(url)
html = self._download_webpage(url, pl_id)
playlist = super(YouPornStarIE, self)._real_extract(url, html=html)
INFO_ELEMENT_RE = r'''(?x)
<div\s[^>]*\bclass\s*=\s*('|")(?:[\w$-]+\s+|\s)*?pornstar-info-wrapper(?:\s+[\w$-]+|\s)*\1[^>]*>
(?P<info>[\s\S]+?)(?:</div>\s*){6,}
'''
infos = self._search_regex(INFO_ELEMENT_RE, html, 'infos', group='info', default='')
if infos:
infos = re.sub(
r'(?:\s*nl=nl)+\s*', ' ',
re.sub(r'(?u)\s+', ' ', clean_html(
re.sub('\n', 'nl=nl', infos)))).replace('ribe Subsc', '')
return merge_dicts({
'description': infos.strip() or None,
}, playlist)
class YouPornVideosIE(YouPornListBase):
IE_DESC = 'YouPorn video (browse) playlists, with sorting, filtering and pagination'
_VALID_URL = r'''(?x)
https?://(?:www\.)?youporn\.com/
(?:(?P<id>browse)/)?
(?P<sort>(?(id)
(?:duration|rating|time|views)|
(?:most_(?:favou?rit|view)ed|recommended|top_rated)?))
(?:[/#?]|$)
'''
_PLAYLIST_TITLEBAR_RE = r'\s+(?:[Vv]ideos|VIDEOS)\s*$'
_TESTS = [{
'note': 'Full list with pagination (too long for test)',
'url': 'https://www.youporn.com/',
'info_dict': {
'id': 'youporn',
'title': 'YouPorn videos',
},
'only_matching': True,
}, {
'note': 'Full list with pagination (too long for test)',
'url': 'https://www.youporn.com/recommended',
'info_dict': {
'id': 'youporn/recommended',
'title': 'YouPorn videos by recommended',
},
'only_matching': True,
}, {
'note': 'Full list with pagination (too long for test)',
'url': 'https://www.youporn.com/top_rated',
'info_dict': {
'id': 'youporn/top_rated',
'title': 'YouPorn videos by top rated',
},
'only_matching': True,
}, {
'note': 'Full list with pagination (too long for test)',
'url': 'https://www.youporn.com/browse/time',
'info_dict': {
'id': 'browse/time',
'title': 'YouPorn videos by time',
},
'only_matching': True,
}, {
'note': 'Filtered paginated list with single page result',
'url': 'https://www.youporn.com/most_favorited/?res=VR&max_minutes=2',
'info_dict': {
'id': 'youporn/most_favorited/max_minutes=2/res=VR',
'title': 'YouPorn videos by most favorited (max_minutes=2,res=VR)',
},
'playlist_mincount': 10,
'playlist_maxcount': 28,
}, {
'note': 'Filtered paginated list with several pages',
'url': 'https://www.youporn.com/most_favorited/?res=VR&max_minutes=5',
'info_dict': {
'id': 'youporn/most_favorited/max_minutes=5/res=VR',
'title': 'YouPorn videos by most favorited (max_minutes=5,res=VR)',
},
'playlist_mincount': 45,
}, {
'note': 'Single page of full list',
'url': 'https://www.youporn.com/browse/time?page=1',
'info_dict': {
'id': 'browse/time/page=1',
'title': 'YouPorn videos by time (page=1)',
},
'playlist_count': 36,
}]
@staticmethod
def _get_title_from_slug(title_slug):
return 'YouPorn' if title_slug == 'browse' else title_slug

10
youtube_dl/traversal.py Normal file
View File

@ -0,0 +1,10 @@
# coding: utf-8
# TODO: move these utils.fns here and move import to utils
# flake8: noqa
from .utils import (
dict_get,
get_first,
T,
traverse_obj,
)

View File

@ -49,11 +49,14 @@ from .compat import (
compat_cookiejar,
compat_ctypes_WINFUNCTYPE,
compat_datetime_timedelta_total_seconds,
compat_etree_Element,
compat_etree_fromstring,
compat_etree_iterfind,
compat_expanduser,
compat_html_entities,
compat_html_entities_html5,
compat_http_client,
compat_http_cookies,
compat_integer_types,
compat_kwargs,
compat_ncompress as ncompress,
@ -6253,15 +6256,16 @@ if __debug__:
def traverse_obj(obj, *paths, **kwargs):
"""
Safely traverse nested `dict`s and `Iterable`s
Safely traverse nested `dict`s and `Iterable`s, etc
>>> obj = [{}, {"key": "value"}]
>>> traverse_obj(obj, (1, "key"))
"value"
'value'
Each of the provided `paths` is tested and the first producing a valid result will be returned.
The next path will also be tested if the path branched but no results could be found.
Supported values for traversal are `Mapping`, `Iterable` and `re.Match`.
Supported values for traversal are `Mapping`, `Iterable`, `re.Match`, `xml.etree.ElementTree`
(xpath) and `http.cookies.Morsel`.
Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
@ -6269,8 +6273,9 @@ def traverse_obj(obj, *paths, **kwargs):
The keys in the path can be one of:
- `None`: Return the current object.
- `set`: Requires the only item in the set to be a type or function,
like `{type}`/`{func}`. If a `type`, returns only values
of this type. If a function, returns `func(obj)`.
like `{type}`/`{type, type, ...}`/`{func}`. If one or more `type`s,
return only values that have one of the types. If a function,
return `func(obj)`.
- `str`/`int`: Return `obj[key]`. For `re.Match`, return `obj.group(key)`.
- `slice`: Branch out and return all values in `obj[key]`.
- `Ellipsis`: Branch out and return a list of all values.
@ -6282,8 +6287,10 @@ def traverse_obj(obj, *paths, **kwargs):
For `Iterable`s, `key` is the enumeration count of the value.
For `re.Match`es, `key` is the group number (0 = full match)
as well as additionally any group names, if given.
- `dict` Transform the current object and return a matching dict.
- `dict`: Transform the current object and return a matching dict.
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- `any`-builtin: Take the first matching object and return it, resetting branching.
- `all`-builtin: Take all matching objects and return them as a list, resetting branching.
`tuple`, `list`, and `dict` all support nested paths and branches.
@ -6299,10 +6306,8 @@ def traverse_obj(obj, *paths, **kwargs):
@param get_all If `False`, return the first matching result, otherwise all matching ones.
@param casesense If `False`, consider string dictionary keys as case insensitive.
The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
The following is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API
@param _is_user_input Whether the keys are generated from user input.
If `True` strings get converted to `int`/`slice` if needed.
@param _traverse_string Whether to traverse into objects as strings.
If `True`, any non-compatible object will first be
converted into a string and then traversed into.
@ -6322,7 +6327,6 @@ def traverse_obj(obj, *paths, **kwargs):
expected_type = kwargs.get('expected_type')
get_all = kwargs.get('get_all', True)
casesense = kwargs.get('casesense', True)
_is_user_input = kwargs.get('_is_user_input', False)
_traverse_string = kwargs.get('_traverse_string', False)
# instant compat
@ -6336,10 +6340,8 @@ def traverse_obj(obj, *paths, **kwargs):
type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
def lookup_or_none(v, k, getter=None):
try:
with compat_contextlib_suppress(LookupError):
return getter(v, k) if getter else v[k]
except IndexError:
return None
def from_iterable(iterables):
# chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
@ -6361,12 +6363,13 @@ def traverse_obj(obj, *paths, **kwargs):
result = obj
elif isinstance(key, set):
assert len(key) == 1, 'Set should only be used to wrap a single item'
item = next(iter(key))
if isinstance(item, type):
result = obj if isinstance(obj, item) else None
assert len(key) >= 1, 'At least one item is required in a `set` key'
if all(isinstance(item, type) for item in key):
result = obj if isinstance(obj, tuple(key)) else None
else:
result = try_call(item, args=(obj,))
item = next(iter(key))
assert len(key) == 1, 'Multiple items in a `set` key must all be types'
result = try_call(item, args=(obj,)) if not isinstance(item, type) else None
elif isinstance(key, (list, tuple)):
branching = True
@ -6375,9 +6378,11 @@ def traverse_obj(obj, *paths, **kwargs):
elif key is Ellipsis:
branching = True
if isinstance(obj, compat_http_cookies.Morsel):
obj = dict(obj, key=obj.key, value=obj.value)
if isinstance(obj, compat_collections_abc.Mapping):
result = obj.values()
elif is_iterable_like(obj):
elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)):
result = obj
elif isinstance(obj, compat_re_Match):
result = obj.groups()
@ -6389,9 +6394,11 @@ def traverse_obj(obj, *paths, **kwargs):
elif callable(key):
branching = True
if isinstance(obj, compat_http_cookies.Morsel):
obj = dict(obj, key=obj.key, value=obj.value)
if isinstance(obj, compat_collections_abc.Mapping):
iter_obj = obj.items()
elif is_iterable_like(obj):
elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)):
iter_obj = enumerate(obj)
elif isinstance(obj, compat_re_Match):
iter_obj = itertools.chain(
@ -6413,6 +6420,8 @@ def traverse_obj(obj, *paths, **kwargs):
if v is not None or default is not NO_DEFAULT) or None
elif isinstance(obj, compat_collections_abc.Mapping):
if isinstance(obj, compat_http_cookies.Morsel):
obj = dict(obj, key=obj.key, value=obj.value)
result = (try_call(obj.get, args=(key,))
if casesense or try_call(obj.__contains__, args=(key,))
else next((v for k, v in obj.items() if casefold(k) == key), None))
@ -6430,12 +6439,40 @@ def traverse_obj(obj, *paths, **kwargs):
else:
result = None
if isinstance(key, (int, slice)):
if is_iterable_like(obj, compat_collections_abc.Sequence):
if is_iterable_like(obj, (compat_collections_abc.Sequence, compat_etree_Element)):
branching = isinstance(key, slice)
result = lookup_or_none(obj, key)
elif _traverse_string:
result = lookup_or_none(str(obj), key)
elif isinstance(obj, compat_etree_Element) and isinstance(key, str):
xpath, _, special = key.rpartition('/')
if not special.startswith('@') and not special.endswith('()'):
xpath = key
special = None
# Allow abbreviations of relative paths, absolute paths error
if xpath.startswith('/'):
xpath = '.' + xpath
elif xpath and not xpath.startswith('./'):
xpath = './' + xpath
def apply_specials(element):
if special is None:
return element
if special == '@':
return element.attrib
if special.startswith('@'):
return try_call(element.attrib.get, args=(special[1:],))
if special == 'text()':
return element.text
raise SyntaxError('apply_specials is missing case for {0!r}'.format(special))
if xpath:
result = list(map(apply_specials, compat_etree_iterfind(obj, xpath)))
else:
result = apply_specials(obj)
return branching, result if branching else (result,)
def lazy_last(iterable):
@ -6456,17 +6493,18 @@ def traverse_obj(obj, *paths, **kwargs):
key = None
for last, key in lazy_last(variadic(path, (str, bytes, dict, set))):
if _is_user_input and isinstance(key, str):
if key == ':':
key = Ellipsis
elif ':' in key:
key = slice(*map(int_or_none, key.split(':')))
elif int_or_none(key) is not None:
key = int(key)
if not casesense and isinstance(key, str):
key = compat_casefold(key)
if key in (any, all):
has_branched = False
filtered_objs = (obj for obj in objs if obj not in (None, {}))
if key is any:
objs = (next(filtered_objs, None),)
else:
objs = (list(filtered_objs),)
continue
if __debug__ and callable(key):
# Verify function signature
_try_bind_args(key, None, None)
@ -6505,9 +6543,9 @@ def traverse_obj(obj, *paths, **kwargs):
return None if default is NO_DEFAULT else default
def T(x):
""" For use in yt-dl instead of {type} or set((type,)) """
return set((x,))
def T(*x):
""" For use in yt-dl instead of {type, ...} or set((type, ...)) """
return set(x)
def get_first(obj, keys, **kwargs):

View File

@ -1,5 +1,5 @@
# Autogenerated by devscripts/update-version.py
from __future__ import unicode_literals
__version__ = '2024.04.08'
RELEASE_GIT_HEAD = 'e0727e4ab61b6e45f7792546b8b5ff52a0ea22b5'
__version__ = '2024.06.12'
RELEASE_GIT_HEAD = '0153b387e57e0bb8e580f1869f85596d2767fb0d'