diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e0d95bb4d9..a8837cb893 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -28,10 +28,14 @@ DiskFileDeleted, DiskFileExpired, QuarantineRequest from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES +from swift.common.internal_client import InternalClient, UnexpectedResponse from swift.common.utils import ( config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, - readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) + readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter, + split_path, Timestamp) +from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \ + HTTP_PRECONDITION_FAILED from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH @@ -92,6 +96,7 @@ def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0, self.total_files_processed = 0 self.passes = 0 self.quarantines = 0 + self.expired = 0 self.errors = 0 self.rcache = rcache self.stats_sizes = sorted( @@ -99,6 +104,12 @@ def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0, self.stats_buckets = dict( [(s, 0) for s in self.stats_sizes + ['OVER']]) + self.delete_expired = bool(self.conf.get('delete_expired') or False) + self.expired_grace = int(self.conf.get('expired_grace_seconds') or 691200) + self.ic_conf_path = \ + self.conf.get('internal_client_conf_path') or \ + '/etc/swift/internal-client.conf' + self.watchers = [ WatcherWrapper(wdef['klass'], name, wdef['conf'], logger) for name, wdef in watcher_defs.items()] @@ -130,6 +141,7 @@ def audit_all_objects(self, mode='once', device_dirs=None): self.total_files_processed = 0 total_quarantines = 0 total_errors = 0 + total_expired = 0 time_auditing = 0 # get AuditLocations for each policy @@ -155,13 +167,14 @@ def audit_all_objects(self, mode='once', device_dirs=None): 'Object audit (%(type)s). ' 'Since %(start_time)s: Locally: %(passes)d passed, ' '%(quars)d quarantined, %(errors)d errors, ' + '%(expired)d expired, ' 'files/sec: %(frate).2f, bytes/sec: %(brate).2f, ' 'Total time: %(total).2f, Auditing time: %(audit).2f, ' 'Rate: %(audit_rate).2f', { 'type': '%s%s' % (self.auditor_type, description), 'start_time': time.ctime(reported), 'passes': self.passes, 'quars': self.quarantines, - 'errors': self.errors, + 'errors': self.errors, 'expired': self.expired, 'frate': self.passes / (now - reported), 'brate': self.bytes_processed / (now - reported), 'total': (now - begin), 'audit': time_auditing, @@ -170,16 +183,18 @@ def audit_all_objects(self, mode='once', device_dirs=None): 'object_auditor_stats_%s' % (self.auditor_type), device_dirs, {'errors': self.errors, 'passes': self.passes, - 'quarantined': self.quarantines, + 'quarantined': self.quarantines, 'expired': self.expired, 'bytes_processed': self.bytes_processed, 'start_time': reported, 'audit_time': time_auditing}) dump_recon_cache(cache_entry, self.rcache, self.logger) reported = now total_quarantines += self.quarantines total_errors += self.errors + total_expired += self.expired self.passes = 0 self.quarantines = 0 self.errors = 0 + self.expired = 0 self.bytes_processed = 0 self.last_logged = now time_auditing += (now - loop_time) @@ -195,6 +210,7 @@ def audit_all_objects(self, mode='once', device_dirs=None): 'mode': mode, 'elapsed': elapsed, 'quars': total_quarantines + self.quarantines, 'errors': total_errors + self.errors, + 'expired': total_expired + self.expired, 'frate': self.total_files_processed / elapsed, 'brate': self.total_bytes_processed / elapsed, 'audit': time_auditing, 'audit_rate': time_auditing / elapsed}) @@ -262,6 +278,14 @@ def raise_dfq(msg): self.record_stats(obj_size) if obj_size and not self.zero_byte_only_at_fps: reader = df.reader(_quarantine_hook=raise_dfq) + # Trigger DiskFileExpired on auditors running in + # open_expired=True mode if delete_expired is active + if self.delete_expired: + if 'X-Delete-At' in metadata: + x_delete_at = int(Timestamp(metadata['X-Delete-At'])) + current_time = int(Timestamp(time.time())) + if x_delete_at <= current_time - int(self.expired_grace): + raise DiskFileExpired if reader: with closing(reader): for chunk in reader: @@ -287,7 +311,22 @@ def raise_dfq(msg): ' quarantined: %(err)s', {'obj': location, 'err': err}) except DiskFileExpired: - pass # ignore expired objects + # The object expirer somtimes fails to remove expired files, is + # stopped for too long or the object is never added to the queue. + # We try to mimic the behaviour of the expirer + # Note that this is more of a failsafe and exception handler. The + # expirer is much more efficient at this work! + if self.delete_expired: + try: + self.delete_expired_object(location, diskfile_mgr) + except: + self.errors += 1 + self.logger.error('ERROR Object %(obj)s is expired ' + 'but could not be deleted!', + {'obj': location}) + else: + # Retain old behavior if not configured to check for expired objects + pass except DiskFileDeleted: # If there is a reclaimable tombstone, we'll invalidate the hash # to trigger the replicator to rehash/cleanup this suffix @@ -309,6 +348,42 @@ def raise_dfq(msg): mtime = time.time() - self.rsync_tempfile_timeout unlink_paths_older_than(rsync_tempfile_paths, mtime) + def delete_expired_object(self, location, diskfile_mgr): + df_expired = diskfile_mgr.get_diskfile_from_audit_location( + location, open_expired=True) + request_tries = int(self.conf.get('request_tries') or 3) + log_route = 'object-auditor' + ic = InternalClient( + self.ic_conf_path, 'Swift Object Auditor Delete Expired', + request_tries, use_replication_network=True, + global_conf={'log_name': '%s-ic' % self.conf.get( + 'log_name', log_route)}) + with df_expired.open(modernize=True): + metadata = df_expired.get_metadata() + x_delete_at = Timestamp(metadata['X-Delete-At']) + current_time = Timestamp(time.time()) + # The object expirer reclamation age is 1 week (that is + # retry expiration for 1 week). If expiration is over 1 + # week old (8 days), delete as the expirer did not and + # have given up... + if int(x_delete_at) <= int(current_time) - int(self.expired_grace): + # Call function to delete object, delete_actual_object + # from expirer + headers = {'X-Timestamp': current_time.normal, + 'X-If-Delete-At': x_delete_at.normal, + 'X-Backend-Clean-Expiring-Object-Queue': 'no'} + acceptable_statuses = (2, HTTP_CONFLICT) + path = metadata.get('name', '') + try: + ic.delete_object(*split_path(path, 3, 3, True), + headers=headers, + acceptable_statuses=acceptable_statuses) + self.expired += 1 + except UnexpectedResponse as err: + if err.resp.status_int not in { + HTTP_NOT_FOUND, HTTP_PRECONDITION_FAILED}: + raise + class ObjectAuditor(Daemon): """Audit objects.""" diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index b7a97a31f7..fab4d46e5e 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1475,7 +1475,7 @@ def object_audit_location_generator(self, policy, device_dirs=None, self.logger, device_dirs, auditor_type) - def get_diskfile_from_audit_location(self, audit_location): + def get_diskfile_from_audit_location(self, audit_location, **kwargs): """ Returns a BaseDiskFile instance for an object at the given AuditLocation. @@ -1485,7 +1485,7 @@ def get_diskfile_from_audit_location(self, audit_location): dev_path = self.get_dev_path(audit_location.device, mount_check=False) return self.diskfile_cls.from_hash_dir( self, audit_location.path, dev_path, - audit_location.partition, policy=audit_location.policy) + audit_location.partition, policy=audit_location.policy, **kwargs) def get_diskfile_and_filenames_from_hash(self, device, partition, object_hash, policy, **kwargs): @@ -2483,9 +2483,9 @@ def content_type_timestamp(self): return Timestamp(t) @classmethod - def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy): + def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy, **kwargs): return cls(mgr, device_path, partition, _datadir=hash_dir_path, - policy=policy) + policy=policy, **kwargs) def open(self, modernize=False, current_time=None): """