sync_manager

This commit is contained in:
Amber 2024-10-01 16:30:56 +02:00
parent d244cf190e
commit c9dde4a63c
4 changed files with 306 additions and 63 deletions

View File

@ -6,7 +6,8 @@ from pathlib import Path
from lib.snapshot.generate import local as _genlocal
from lib.snapshot import dump as _dump
from lib.snapshot.generate.remote import RHAgent
# from lib.snapshot.generate.remote import RHAgent
from lib.sclient.agent import AdvancedSftpClientAgent
from lib.diff import fdiff
@ -182,10 +183,10 @@ class Manager():
def get_agent(self):
try:
return self.rhagent
return self.agent
except: pass
self.rhagent = RHAgent(self.hostname, self.username)
return self.rhagent
self.agent = AdvancedSftpClientAgent(self.hostname, self.username)
return self.agent
def get_local_snap_diff(self):
last_tree = self.load_local_hash()
@ -260,16 +261,22 @@ class Manager():
## node is a folder
return agent.check_rfolder_status(absolute_remote_path)
def copy_node(self, node):
pass
# node_type = node['type']
# node_name = node['name']
# node_path = node['rel_path']
#
# agent = self.get_agent()
#
# if node_type == 'd':
# pass
def copy_node_to_remote(self, node):
'''
use put of sftp client
put(localpath, remotepath, callback=None, confirm=True)
'''
absolute_local_path = self.get_absolute_local_path(node)
absolute_remote_path = self.get_absolute_remote_path(node)
agent = self.get_agent()
if node['cur_type'] == 'f':
## simply put
return agent.copy_file_to_remote(absolute_local_path, absolute_remote_path)
elif node['cur_type'] == 'd':
#cur_hash contains the entire subtree to recreate on remote
## it is better to remove the root node if already exists on remote
agent.remove_dir(absolute_remote_path)
def show_conflicts(self):
unmerged = self.load_unmerged_diff()
@ -318,9 +325,11 @@ class Manager():
self.store_fdiff(found)
return
## found the node in unmerged path
unmerged_local_path = self.get_unmerged_local_path(node)
absolute_remote_path = self.get_absolute_remote_path(node)
# unmerged_local_path = self.get_unmerged_local_path(node)
# absolute_remote_path = self.get_absolute_remote_path(node)
# copy node from the unmerged path to remote -> this is
print(f'force local node: {unmerged_local_path} to remote path: {absolute_remote_path}')
self.copy_node_to_remote(node)
@ -338,7 +347,7 @@ class Manager():
# current_tree = self.compute_local_hash()
local_snap_diff = self.get_local_snap_diff()
# rhagent = self.init_rh_agent()
agent = self.get_agent()
# agent = self.get_agent()
changes = local_snap_diff.get('changed') or []
unmerged = []

View File

@ -0,0 +1,2 @@
TYPE_DIR = 'd'
TYPE_FILE = 'f'

View File

@ -1,6 +1,15 @@
import time
import os
import stat
import errno
import asyncio
import hashlib
import io
import functools
from lib.sclient.base_agent import BaseAgent
# import paramiko.sftp_client as _sftp_client
@ -20,7 +29,7 @@ def downLoadFile(sftp, remotePath, localPath):
sftp.get(fileattr.filename, os.path.join(localPath, fileattr.filename))
'''
class SftpClientAgent(BaseAgent):
class AdvancedSftpClientAgent(BaseAgent):
'''
it give to you a sftp client for using to compute remote hash
and basic ftp command to synchronize local and remote
@ -30,17 +39,6 @@ class SftpClientAgent(BaseAgent):
BaseAgent.__init__(self, hostname, username)
self.sftpc = None
def compute_hash(self, name):
self.connect()
stdin, stdout, stderr = self.client.exec_command("sha256sum %s | awk '{print $1}'" % (name, ))
# self.close()
ostream = stdout.read()
estream = stderr.read()
# stdout.read().strip() is the hash
if estream:
print("Command failed for this reason: %s" % (estream))
return ostream.strip()
def get_sftp_client(self):
if self.sftpc:
return self.sftpc
@ -51,6 +49,16 @@ class SftpClientAgent(BaseAgent):
self.sftpc = self.client.open_sftp()
return self.sftpc
# NOT USED
# def compute_hash(self, name):
# self.connect()
# stdin, stdout, stderr = self.client.exec_command("sha256sum %s | awk '{print $1}'" % (name, ))
# ostream = stdout.read()
# estream = stderr.read()
# if estream:
# print("Command failed for this reason: %s" % (estream))
# return ostream.strip()
def test_is_dir(self, path='.'):
sftpc = self.get_sftp_client()
@ -60,29 +68,29 @@ class SftpClientAgent(BaseAgent):
continue
print('name: %s is regular FILE' % (attr.filename))
def put(self, localpath, remotepath, callback=None):
'''
wrap the method put of paramiko sftp client
Copy a local file (localpath) to the SFTP server as remotepath.
Any exception raised by operations will be passed through. This method is primarily provided as a convenience.
'''
sftpc = self.get_sftp_client()
sftpc.put(localpath, remotepath, callback=callback)
# def put(self, localpath, remotepath, callback=None):
# '''
# wrap the method put of paramiko sftp client
# Copy a local file (localpath) to the SFTP server as remotepath.
# Any exception raised by operations will be passed through. This method is primarily provided as a convenience.
# '''
# sftpc = self.get_sftp_client()
# sftpc.put(localpath, remotepath, callback=callback)
#
# def get(self, localpath, remotepath, callback=None):
# sftpc = self.get_sftp_client()
# sftpc.get(localpath, remotepath, callback=callback)
def get(self, localpath, remotepath, callback=None):
sftpc = self.get_sftp_client()
sftpc.get(localpath, remotepath, callback=callback)
# def open(self, remotepath):
# sftpc = self.get_sftp_client()
# buf = ''
# with sftpc.open(remotepath,mode='r') as rfile:
# buf = rfile.read()
# return buf
def open(self, remotepath):
sftpc = self.get_sftp_client()
buf = ''
with sftpc.open(remotepath,mode='r') as rfile:
buf = rfile.read()
return buf
def mkdir(self, path):
sftpc = self.get_sftp_client()
sftpc.mkdir(path)
# def mkdir(self, path):
# sftpc = self.get_sftp_client()
# sftpc.mkdir(path)
def rpath_exits(self, path):
sftpc = self.get_sftp_client()
@ -92,6 +100,208 @@ class SftpClientAgent(BaseAgent):
if e.errno == errno.ENOENT: return False
return True
def remove_dir(self, absolute_path):
'''
Here we assume that absolute_path on remote can be not empty
XXX: the ftp client can remove only empty folder
'''
pass
# def get_sftp_client(self):
# a = self.agent
# return a.get_sftp_client()
def check_rfile_status(self, file_path:str):
try:
iobuf, dig = self.generate_rfile_hash(file_path, return_also_buffer=True)
return {
'exists' : True,
'iobuffer' : iobuf,
'hash': dig,
}
except IOError as e:
if e.errno == errno.ENOENT: return {
'exists' : False,
'iobuffer' : None,
'hash': None
}
raise Exception(f'Something went wrong and strange {file_path}')
def check_rfolder_status(self, path:str):
try:
dig = self.generate_tree_hash_oversftp(path)
return {
'exists' : True,
'iobuffer' : None,
'hash': dig,
}
except IOError as e:
if e.errno == errno.ENOENT: return {
'exists' : False,
'iobuffer' : None,
'hash': None
}
raise Exception(f'Something went wrong and strange {path}')
## move in SftpClientAgent
def copy_file_to_remote(self, localpath, remotepath, callback=None, confirm=False):
'''
put(localpath, remotepath, callback=None, confirm=True)
'''
def def_cback():
print(f'{localpath} copied to {remotepath}, success')
callback = callback if callback else def_cback
client = self.get_sftp_client()
client.put(localpath, remotepath, callback=callback, confirm=confirm)
return
def generate_rfile_hash(self, file_path, hexdigest=True, return_also_buffer=False):
# a = self.agent
client = self.get_sftp_client()
print(f'Trying to get {file_path}')
with client.open(file_path, "rb") as f:
buf = f.read()
if hexdigest:
dig = hashlib.md5(buf).hexdigest()
else:
dig = hashlib.md5(buf).digest()
if not return_also_buffer:
return dig
return (io.BytesIO(buf), dig)
def generate_tree_hash_oversftp(self, root_path :str):
'''
@param root_path string, root_path in remote server
generate a map of hashes starting from `root_path` recursively
'''
rtreemap = {}
if not root_path.endswith(os.path.sep):
root_path = root_path + os.path.sep
# a = self.agent
sftpc = self.get_sftp_client()
for item in sftpc.listdir_attr(root_path):
absolute_item_path = root_path + item.filename
print('absolute_item_path: %s, item %s, isdir: %s' % (absolute_item_path, item.filename, stat.S_ISDIR(item.st_mode)))
if stat.S_ISDIR(item.st_mode):
rtreemap[item.filename] = self.generate_tree_hash_oversftp(absolute_item_path)
else:
rtreemap[item.filename] = self.generate_rfile_hash(absolute_item_path)
return rtreemap
def generate_file_hash_oversftp(self, file_path: str, return_also_buffer: bool=False):
sftpc = self.get_sftp_client()
return self.generate_rfile_hash(file_path, return_also_buffer=return_also_buffer)
async def generate_rfile_hash_async(self, file_path, hexdigest=True):
# a = self.agent
client = self.get_sftp_client()
with client.open(file_path, "rb") as f:
buf = f.read()
if hexdigest: return hashlib.md5(buf).hexdigest()
return hashlib.md5(buf).digest()
async def generate_tree_hash_oversftp_async(self, root_path :str):
'''
@param root_path string, root_path in remote server
generate a map of hashes starting from `root_path` recursively
'''
# if not os.path.isdir(root_path):
# raise Exception('Provide a valid folder to start the hashing')
#
# if not root_path.endswith(os.path.sep):
# root_path = root_path + os.path.sep
# root_path = check_isdir(root_path)
rtreemap = {}
if not root_path.endswith(os.path.sep):
root_path = root_path + os.path.sep
# a = self.agent
sftpc = self.get_sftp_client()
def dtask_done_cback(fname, f):
print('dtask done %s' % (fname,))
rtreemap[fname] = f.result()
def ftask_done_cback(fname, f):
print('ftask done %s' % (fname,))
rtreemap[fname] = f.result()
# futures_map = {}
tasks = []
for item in sftpc.listdir_attr(root_path):
absolute_item_path = root_path + item.filename
print('absolute_item_path: %s, item %s, isdir: %s' % (absolute_item_path, item.filename, stat.S_ISDIR(item.st_mode)))
if stat.S_ISDIR(item.st_mode):
# rtreemap[item.filename] = await generate_tree_hash_oversftp_async(absolute_item_path)
dtask = asyncio.create_task(self.generate_tree_hash_oversftp_async(absolute_item_path))
dtask.add_done_callback(functools.partial(dtask_done_cback, item.filename))
tasks.append(dtask)
else:
# rtreemap[item.filename] = await generate_rfile_hash_async(absolute_item_path, client=sftpc)
ftask = asyncio.create_task(self.generate_rfile_hash_async(absolute_item_path))
ftask.add_done_callback(functools.partial(ftask_done_cback, item.filename))
tasks.append(ftask)
# item.filename
await asyncio.gather(*tasks)
return rtreemap
def test_sync_rtree(path='/home/notanamber/notes_dev/'):
start = time.time()
print('Start task')
agent = AdvancedSftpClientAgent('myvps', 'notanamber')
rtree = agent.generate_tree_hash_oversftp(path)
end = time.time()
print('task done in %.2f' % (end - start))
return rtree
def test_async_rtree(path='/home/notanamber/notes_dev/'):
start = time.time()
print('Start task')
agent = AdvancedSftpClientAgent('myvps', 'notanamber')
rtree = asyncio.run(agent.generate_tree_hash_oversftp_async(path))
end = time.time()
print('task done in %.2f' % (end - start))
return rtree
'''
a
- b
- c
- k.txt
i.txt
g.txt
j.txt
k.txt
tree['a'] = {
'b' : {
},
'j.txt' : '012349jasdfh9934',
}
'''
# synca = SyncAgent()
# sftpc = a.get_sftp_client()

View File

@ -12,6 +12,11 @@ import functools
from lib.sclient.agent import SftpClientAgent
'''
DO NOT USE THIS: move this methods in AdvancedSftpClientAgent
'''
class RHAgent():
'''
Give you an sftp connection that you can use for generate remote file hash
@ -23,13 +28,20 @@ class RHAgent():
self.username = username
self.agent = SftpClientAgent(hostname, username)
def get_sftp_client():
def remove_dir(self, absolute_path):
'''
Here we assume that absolute_path on remote can be not empty
XXX: the ftp client can remove only empty folder
'''
pass
def get_sftp_client(self):
a = self.agent
return a.get_sftp_client()
def generate_rfile_hash(self, file_path, hexdigest=True, return_also_buffer=False):
a = self.agent
client = a.get_sftp_client()
# a = self.agent
client = self.get_sftp_client()
print(f'Trying to get {file_path}')
@ -78,8 +90,18 @@ class RHAgent():
}
raise Exception(f'Something went wrong and strange {path}')
def copy_node(self):
pass
## move in SftpClientAgent
def copy_file_to_remote(self, localpath, remotepath, callback=None, confirm=False):
'''
put(localpath, remotepath, callback=None, confirm=True)
'''
def def_cback():
print(f'{localpath} copied to {remotepath}, success')
callback = callback if callback else def_cback
client = self.get_sftp_client()
client.put(localpath, remotepath, callback=callback, confirm=confirm)
return
def generate_tree_hash_oversftp(self, root_path :str):
@ -94,8 +116,8 @@ class RHAgent():
if not root_path.endswith(os.path.sep):
root_path = root_path + os.path.sep
a = self.agent
sftpc = a.get_sftp_client()
# a = self.agent
sftpc = self.get_sftp_client()
for item in sftpc.listdir_attr(root_path):
absolute_item_path = root_path + item.filename
@ -108,13 +130,13 @@ class RHAgent():
return rtreemap
def generate_file_hash_oversftp(self, file_path: str, return_also_buffer: bool=False):
a = self.agent
sftpc = a.get_sftp_client()
# a = self.agent
sftpc = self.get_sftp_client()
return self.generate_rfile_hash(file_path, return_also_buffer=return_also_buffer)
async def generate_rfile_hash_async(self, file_path, hexdigest=True):
a = self.agent
client = a.get_sftp_client()
# a = self.agent
client = self.get_sftp_client()
with client.open(file_path, "rb") as f:
buf = f.read()
@ -139,8 +161,8 @@ class RHAgent():
if not root_path.endswith(os.path.sep):
root_path = root_path + os.path.sep
a = self.agent
sftpc = a.get_sftp_client()
# a = self.agent
sftpc = self.get_sftp_client()
def dtask_done_cback(fname, f):
print('dtask done %s' % (fname,))