Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions swift/obj/auditor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
DiskFileDeleted, DiskFileExpired, QuarantineRequest
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.utils import (
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter,
split_path, Timestamp)
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH


Expand Down Expand Up @@ -92,13 +96,20 @@ def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0,
self.total_files_processed = 0
self.passes = 0
self.quarantines = 0
self.expired = 0
self.errors = 0
self.rcache = rcache
self.stats_sizes = sorted(
[int(s) for s in list_from_csv(conf.get('object_size_stats'))])
self.stats_buckets = dict(
[(s, 0) for s in self.stats_sizes + ['OVER']])

self.delete_expired = bool(self.conf.get('delete_expired') or False)
self.expired_grace = int(self.conf.get('expired_grace_seconds') or 691200)
self.ic_conf_path = \
self.conf.get('internal_client_conf_path') or \
'/etc/swift/internal-client.conf'

self.watchers = [
WatcherWrapper(wdef['klass'], name, wdef['conf'], logger)
for name, wdef in watcher_defs.items()]
Expand Down Expand Up @@ -130,6 +141,7 @@ def audit_all_objects(self, mode='once', device_dirs=None):
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
total_expired = 0
time_auditing = 0

# get AuditLocations for each policy
Expand All @@ -155,13 +167,14 @@ def audit_all_objects(self, mode='once', device_dirs=None):
'Object audit (%(type)s). '
'Since %(start_time)s: Locally: %(passes)d passed, '
'%(quars)d quarantined, %(errors)d errors, '
'%(expired)d expired, '
'files/sec: %(frate).2f, bytes/sec: %(brate).2f, '
'Total time: %(total).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f', {
'type': '%s%s' % (self.auditor_type, description),
'start_time': time.ctime(reported),
'passes': self.passes, 'quars': self.quarantines,
'errors': self.errors,
'errors': self.errors, 'expired': self.expired,
'frate': self.passes / (now - reported),
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
Expand All @@ -170,16 +183,18 @@ def audit_all_objects(self, mode='once', device_dirs=None):
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs,
{'errors': self.errors, 'passes': self.passes,
'quarantined': self.quarantines,
'quarantined': self.quarantines, 'expired': self.expired,
'bytes_processed': self.bytes_processed,
'start_time': reported, 'audit_time': time_auditing})
dump_recon_cache(cache_entry, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors
total_expired += self.expired
self.passes = 0
self.quarantines = 0
self.errors = 0
self.expired = 0
self.bytes_processed = 0
self.last_logged = now
time_auditing += (now - loop_time)
Expand All @@ -195,6 +210,7 @@ def audit_all_objects(self, mode='once', device_dirs=None):
'mode': mode, 'elapsed': elapsed,
'quars': total_quarantines + self.quarantines,
'errors': total_errors + self.errors,
'expired': total_expired + self.expired,
'frate': self.total_files_processed / elapsed,
'brate': self.total_bytes_processed / elapsed,
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
Expand Down Expand Up @@ -262,6 +278,14 @@ def raise_dfq(msg):
self.record_stats(obj_size)
if obj_size and not self.zero_byte_only_at_fps:
reader = df.reader(_quarantine_hook=raise_dfq)
# Trigger DiskFileExpired on auditors running in
# open_expired=True mode if delete_expired is active
if self.delete_expired:
if 'X-Delete-At' in metadata:
x_delete_at = int(Timestamp(metadata['X-Delete-At']))
current_time = int(Timestamp(time.time()))
if x_delete_at <= current_time - int(self.expired_grace):
raise DiskFileExpired
if reader:
with closing(reader):
for chunk in reader:
Expand All @@ -287,7 +311,22 @@ def raise_dfq(msg):
' quarantined: %(err)s',
{'obj': location, 'err': err})
except DiskFileExpired:
pass # ignore expired objects
# The object expirer somtimes fails to remove expired files, is
# stopped for too long or the object is never added to the queue.
# We try to mimic the behaviour of the expirer
# Note that this is more of a failsafe and exception handler. The
# expirer is much more efficient at this work!
if self.delete_expired:
try:
self.delete_expired_object(location, diskfile_mgr)
except:
self.errors += 1
self.logger.error('ERROR Object %(obj)s is expired '
'but could not be deleted!',
{'obj': location})
else:
# Retain old behavior if not configured to check for expired objects
pass
except DiskFileDeleted:
# If there is a reclaimable tombstone, we'll invalidate the hash
# to trigger the replicator to rehash/cleanup this suffix
Expand All @@ -309,6 +348,42 @@ def raise_dfq(msg):
mtime = time.time() - self.rsync_tempfile_timeout
unlink_paths_older_than(rsync_tempfile_paths, mtime)

def delete_expired_object(self, location, diskfile_mgr):
df_expired = diskfile_mgr.get_diskfile_from_audit_location(
location, open_expired=True)
request_tries = int(self.conf.get('request_tries') or 3)
log_route = 'object-auditor'
ic = InternalClient(
self.ic_conf_path, 'Swift Object Auditor Delete Expired',
request_tries, use_replication_network=True,
global_conf={'log_name': '%s-ic' % self.conf.get(
'log_name', log_route)})
with df_expired.open(modernize=True):
metadata = df_expired.get_metadata()
x_delete_at = Timestamp(metadata['X-Delete-At'])
current_time = Timestamp(time.time())
# The object expirer reclamation age is 1 week (that is
# retry expiration for 1 week). If expiration is over 1
# week old (8 days), delete as the expirer did not and
# have given up...
if int(x_delete_at) <= int(current_time) - int(self.expired_grace):
# Call function to delete object, delete_actual_object
# from expirer
headers = {'X-Timestamp': current_time.normal,
'X-If-Delete-At': x_delete_at.normal,
'X-Backend-Clean-Expiring-Object-Queue': 'no'}
acceptable_statuses = (2, HTTP_CONFLICT)
path = metadata.get('name', '')
try:
ic.delete_object(*split_path(path, 3, 3, True),
headers=headers,
acceptable_statuses=acceptable_statuses)
self.expired += 1
except UnexpectedResponse as err:
if err.resp.status_int not in {
HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED}:
raise


class ObjectAuditor(Daemon):
"""Audit objects."""
Expand Down
8 changes: 4 additions & 4 deletions swift/obj/diskfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ def object_audit_location_generator(self, policy, device_dirs=None,
self.logger, device_dirs,
auditor_type)

def get_diskfile_from_audit_location(self, audit_location):
def get_diskfile_from_audit_location(self, audit_location, **kwargs):
"""
Returns a BaseDiskFile instance for an object at the given
AuditLocation.
Expand All @@ -1485,7 +1485,7 @@ def get_diskfile_from_audit_location(self, audit_location):
dev_path = self.get_dev_path(audit_location.device, mount_check=False)
return self.diskfile_cls.from_hash_dir(
self, audit_location.path, dev_path,
audit_location.partition, policy=audit_location.policy)
audit_location.partition, policy=audit_location.policy, **kwargs)

def get_diskfile_and_filenames_from_hash(self, device, partition,
object_hash, policy, **kwargs):
Expand Down Expand Up @@ -2483,9 +2483,9 @@ def content_type_timestamp(self):
return Timestamp(t)

@classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy, **kwargs):
return cls(mgr, device_path, partition, _datadir=hash_dir_path,
policy=policy)
policy=policy, **kwargs)

def open(self, modernize=False, current_time=None):
"""
Expand Down