From 87c796cbe0340339a484041d4930c53e70f79959 Mon Sep 17 00:00:00 2001 From: Adam Cox Date: Thu, 12 Mar 2026 23:13:16 -0500 Subject: [PATCH] split core models.py into separate files --- ohmg/core/models.py | 1173 ---------------------------------- ohmg/core/models/__init__.py | 6 + ohmg/core/models/document.py | 188 ++++++ ohmg/core/models/layer.py | 226 +++++++ ohmg/core/models/layerset.py | 198 ++++++ ohmg/core/models/map.py | 454 +++++++++++++ ohmg/core/models/mapgroup.py | 33 + ohmg/core/models/region.py | 155 +++++ ohmg/core/utils/__init__.py | 15 + 9 files changed, 1275 insertions(+), 1173 deletions(-) delete mode 100644 ohmg/core/models.py create mode 100644 ohmg/core/models/__init__.py create mode 100644 ohmg/core/models/document.py create mode 100644 ohmg/core/models/layer.py create mode 100644 ohmg/core/models/layerset.py create mode 100644 ohmg/core/models/map.py create mode 100644 ohmg/core/models/mapgroup.py create mode 100644 ohmg/core/models/region.py diff --git a/ohmg/core/models.py b/ohmg/core/models.py deleted file mode 100644 index 981e307e..00000000 --- a/ohmg/core/models.py +++ /dev/null @@ -1,1173 +0,0 @@ -import json -import logging -import shutil -import urllib.parse -from datetime import datetime -from pathlib import Path -from typing import Iterable, Union - -from django.conf import settings -from django.contrib.auth import get_user_model -from django.contrib.auth.models import Group -from django.contrib.contenttypes.models import ContentType -from django.contrib.gis.db import models -from django.contrib.gis.geos import GEOSGeometry, MultiPolygon, Polygon -from django.contrib.postgres.fields import ArrayField -from django.core.files import File -from django.core.files.base import ContentFile -from django.db import transaction -from django.db.models import Q -from django.utils.functional import cached_property -from django.utils.safestring import mark_safe -from natsort import natsorted - -from ohmg.places.models import Place - -from .storages import get_file_url -from .utils import ( - MONTH_CHOICES, - slugify, -) -from .utils.image import ( - convert_img_format, - generate_document_thumbnail_content, - generate_layer_thumbnail_content, - get_extent_from_file, - get_image_size, -) -from .utils.requests import download_image - -logger = logging.getLogger(__name__) - - -class MapGroup(models.Model): - class Meta: - verbose_name_plural = " Map Groups" - - MAP_PREFIX_CHOICES = ( - ("volume", "volume"), - ("part", "part"), - ) - - MAP_PREFIX_ABBREVIATIONS = { - "volume": "Vol.", - "part": "Pt.", - } - - title = models.CharField(max_length=200) - slug = models.SlugField(max_length=100) - year_start = models.IntegerField(blank=True, null=True) - year_end = models.IntegerField(blank=True, null=True) - creator = models.CharField(max_length=200) - publisher = models.CharField(max_length=200) - map_prefix = models.CharField( - max_length=10, - choices=MAP_PREFIX_CHOICES, - null=True, - blank=True, - help_text="The preferred term for referring to maps within this map group.", - ) - - def __str__(self): - return self.title - - -def get_session_user_summary(session_list): - users = session_list.values_list("user__username", flat=True) - user_dict = {} - for name in users: - user_dict[name] = user_dict.get( - name, - { - "ct": 0, - "name": name, - }, - ) - user_dict[name]["ct"] += 1 - return sorted(user_dict.values(), key=lambda item: item.get("ct"), reverse=True) - - -class Map(models.Model): - class Meta: - verbose_name_plural = " Maps" - - ACCESS_CHOICES = ( - ("none", "none"), - ("sponsor", "sponsor"), - ("any", "any"), - ) - ACCESS_LEVEL_CHOICES = ( - ("public", "Public"), - ("restricted", "Restricted"), - ("none", "None"), - ) - - DOCUMENT_PREFIX_CHOICES = ( - ("page", "page"), - ("sheet", "sheet"), - ("plate", "plate"), - ("part", "part"), - ) - - DOCUMENT_PREFIX_ABBREVIATIONS = { - "page": "p", - "sheet": "s", - "plate": "pl", - "part": "pt", - } - - identifier = models.CharField(max_length=100, primary_key=True) - slug = models.SlugField(max_length=100) - title = models.CharField(max_length=200) - year = models.IntegerField(blank=True, null=True) - month = models.IntegerField(blank=True, null=True, choices=MONTH_CHOICES) - creator = models.CharField( - max_length=200, - null=True, - blank=True, - ) - publisher = models.CharField( - max_length=200, - null=True, - blank=True, - ) - volume_number = models.CharField( - max_length=25, - null=True, - blank=True, - help_text="Volume number (or name?), if this map is included in a MapGroup.", - ) - document_page_type = models.CharField( - max_length=10, - choices=DOCUMENT_PREFIX_CHOICES, - null=True, - blank=True, - help_text="The preferred term for referring to documents within this map.", - ) - iiif_manifest = models.JSONField(null=True, blank=True) - create_date = models.DateTimeField(auto_now_add=True) - load_date = models.DateTimeField(null=True, blank=True) - loading_documents = models.BooleanField( - default=False, - help_text="true only when document files are being loaded for this map", - ) - document_sources = models.JSONField( - null=True, - blank=True, - default=dict, - ) - item_lookup = models.JSONField( - null=True, - blank=True, - default=dict, - ) - featured = models.BooleanField(default=False, help_text="show in featured section") - hidden = models.BooleanField( - default=False, - help_text="this map will be excluded from api calls (but url available directly)", - ) - locales = models.ManyToManyField( - Place, - blank=True, - ) - mapgroup = models.ForeignKey( - MapGroup, null=True, blank=True, on_delete=models.SET_NULL, related_name="maps" - ) - access = models.CharField(max_length=50, choices=ACCESS_CHOICES, default="any") - access_level = models.CharField(max_length=50, choices=ACCESS_LEVEL_CHOICES, default="any") - user_access = models.ManyToManyField( - settings.AUTH_USER_MODEL, - blank=True, - related_name="maps_allowed", - ) - group_access = models.ManyToManyField( - Group, - blank=True, - related_name="maps_allowed", - ) - sponsor = models.ForeignKey( - settings.AUTH_USER_MODEL, - blank=True, - null=True, - on_delete=models.SET_NULL, - related_name="maps_sponsored", - ) - loaded_by = models.ForeignKey( - settings.AUTH_USER_MODEL, - blank=True, - null=True, - on_delete=models.CASCADE, - related_name="maps_loaded", - ) - document_ct = models.IntegerField( - default=0, - ) - unprepared_ct = models.IntegerField( - default=0, - ) - region_ct = models.IntegerField( - default=0, - ) - prepared_ct = models.IntegerField( - default=0, - ) - layer_ct = models.IntegerField( - default=0, - ) - skip_ct = models.IntegerField( - default=0, - ) - nonmap_ct = models.IntegerField( - default=0, - ) - completion_pct = models.IntegerField( - default=0, - ) - multimask_ct = models.IntegerField( - default=0, - ) - multimask_rank = models.DecimalField( - max_digits=7, - decimal_places=6, - default=0.000000, - ) - - def __str__(self): - return self.title - - @property - def regions(self): - return Region.objects.filter(document__in=self.documents.all()).order_by("title") - - @property - def layers(self): - return Layer.objects.filter(region__in=self.regions).order_by("title") - - @property - def extent(self): - layerset_extents = [] - for ls in self.layerset_set.all(): - if ls.extent: - poly = Polygon().from_bbox(ls.extent) - layerset_extents.append(poly) - if layerset_extents: - return MultiPolygon(layerset_extents).extent - else: - return None - - @property - def gt_exists(self): - return ( - True - if self.get_layerset("main-content") - and self.get_layerset("main-content").mosaic_geotiff - else False - ) - - @property - def mj_exists(self): - return ( - True - if self.get_layerset("main-content") and self.get_layerset("main-content").mosaic_json - else False - ) - - @property - def stats(self): - unprep_ct = len(self.item_lookup["unprepared"]) - prep_ct = len(self.item_lookup["prepared"]) - georef_ct = len(self.item_lookup["georeferenced"]) - skipped_ct = len(self.item_lookup["skipped"]) - percent = 0 - if georef_ct > 0: - percent = int((georef_ct / (unprep_ct + prep_ct + georef_ct)) * 100) - - main_layerset = self.get_layerset("main-content") - if main_layerset: - main_lyrs_ct = main_layerset.get_layers().count() - else: - main_lyrs_ct = 0 - mm_ct, mm_todo, mm_percent = 0, 0, 0 - if main_lyrs_ct != 0: - # make sure 0/0 appears at the very bottom, then 0/1, 0/2, etc. - mm_percent = main_lyrs_ct * 0.000001 - mm_display = f"0/{main_lyrs_ct}" - if main_layerset and main_layerset.multimask is not None: - mm_ct = len(main_layerset.multimask) - mm_todo = main_lyrs_ct - mm_ct - if mm_ct > 0 and main_lyrs_ct > 0: - mm_display = f"{mm_ct}/{main_lyrs_ct}" - mm_percent = mm_ct / main_lyrs_ct - mm_percent += main_lyrs_ct * 0.000001 - - return { - "unprepared_ct": unprep_ct, - "prepared_ct": prep_ct, - "georeferenced_ct": georef_ct, - "skipped_ct": skipped_ct, - "percent": percent, - "mm_ct": mm_todo, - "mm_display": mm_display, - "mm_percent": mm_percent, - } - - def get_locale(self, serialized=False): - """Returns the first locale in the list of related locales. - This is a patch in use until the frontend is ready for multiple - locales per item.""" - if len(self.locales.all()) > 0: - locale = self.locales.all()[0] - if serialized: - return locale.serialize() - else: - return locale - else: - return None - - def get_layerset(self, cat_slug: str, create: bool = False): - try: - layerset = LayerSet.objects.get(map=self, category__slug=cat_slug) - except LayerSet.DoesNotExist: - if create: - category = LayerSetCategory.objects.get(slug=cat_slug) - layerset = LayerSet.objects.create(map=self, category=category) - logger.debug(f"created new LayerSet: {self.pk} - {cat_slug}") - else: - layerset = None - return layerset - - def create_documents(self): - """Iterates the list of items in self.document_sources and create Document - objects for each one. If get_files=True, load files from their path. - - A document source entry should look like: - { - path: - iiif_info: - page_number: - } - """ - for source in self.document_sources: - document, created = Document.objects.get_or_create( - map=self, - page_number=source["page_number"], - ) - document.source_url = source["path"] - document.iiif_info = source["iiif_info"] - document.save(skip_map_lookup_update=True) - if created: - logger.debug(f"{document} ({document.pk}) created.") - logger.debug(f"Map {self.title} ({self.pk}) has {len(self.documents.all())} Documents") - self.update_item_lookup() - - def load_all_document_files(self, username, overwrite=False): - self.loading_documents = True - self.save() - for document in natsorted(self.documents.all(), key=lambda k: k.title): - if not document.file and not overwrite: - try: - document.load_file_from_source(username, overwrite=True) - except Exception as e: - logger.error(f"error loading document {document.pk}: {e}") - document.loading_file = False - document.save() - self.loading_documents = False - self.save() - - def remove_sheets(self): - for document in self.documents: - document.delete() - - def update_place_counts(self): - locale = self.get_locale() - if locale is not None: - with transaction.atomic(): - locale.volume_count += 1 - locale.volume_count_inclusive += 1 - locale.save(update_fields=["volume_count", "volume_count_inclusive"]) - parents = locale.direct_parents.all() - while parents: - new_parents = [] - for p in parents: - p.volume_count_inclusive += 1 - p.save(update_fields=["volume_count_inclusive"]) - new_parents += list(p.direct_parents.all()) - parents = new_parents - - def get_absolute_url(self): - return f"/map/{self.pk}/" - - def update_item_lookup(self): - from ohmg.api.schemas import DocumentSchema, LayerSchema, RegionSchema - - regions = self.regions - items = { - "unprepared": [ - DocumentSchema.from_orm(i).dict() for i in self.documents.filter(prepared=False) - ], - "prepared": [ - RegionSchema.from_orm(i).dict() - for i in regions.filter(georeferenced=False, category__slug="map").exclude( - skipped=True - ) - ], - "georeferenced": [LayerSchema.from_orm(i).dict() for i in self.layers], - "nonmaps": [ - RegionSchema.from_orm(i).dict() for i in regions.exclude(category__slug="map") - ], - "skipped": [RegionSchema.from_orm(i).dict() for i in regions.filter(skipped=True)], - "processing": { - "unprep": 0, - "prep": 0, - "geo_trim": 0, - }, - } - for cat in ["unprepared", "prepared", "georeferenced", "nonmaps", "skipped"]: - items[cat] = natsorted(items[cat], key=lambda k: k["title"]) - self.item_lookup = items - - document_ct = self.documents.all().count() - unprepared_ct = len(items["unprepared"]) - region_ct = self.regions.count() - prepared_ct = len(items["prepared"]) - layer_ct = self.layers.count() - skip_ct = len(items["skipped"]) - nonmap_ct = len(items["nonmaps"]) - - completion_pct = 0 - if layer_ct > 0: - completion_pct = int((layer_ct / (unprepared_ct + prepared_ct + layer_ct)) * 100) - - multimask_ct, multimask_rank = 0, 0 - main_layerset = self.get_layerset("main-content") - if main_layerset: - main_lyrs_ct = main_layerset.get_layers().count() - else: - main_lyrs_ct = 0 - - if main_lyrs_ct != 0: - # make sure 0/0 appears at the very bottom, then 0/1, 0/2, etc. - multimask_rank = main_lyrs_ct * 0.000001 - - if main_layerset and main_layerset.multimask is not None: - multimask_ct = len(main_layerset.multimask) - if multimask_ct > 0 and main_lyrs_ct > 0: - pct = multimask_ct / main_lyrs_ct - multimask_rank += pct * 0.000001 - - self.document_ct = document_ct - self.unprepared_ct = unprepared_ct - self.region_ct = region_ct - self.prepared_ct = prepared_ct - self.layer_ct = layer_ct - self.skip_ct = skip_ct - self.nonmap_ct = nonmap_ct - self.completion_pct = completion_pct - self.multimask_ct = multimask_ct - self.multimask_rank = multimask_rank - - self.save( - update_fields=[ - "item_lookup", - "document_ct", - "unprepared_ct", - "region_ct", - "prepared_ct", - "layer_ct", - "skip_ct", - "nonmap_ct", - "completion_pct", - "multimask_ct", - "multimask_rank", - ] - ) - - def get_session_summary(self): - from ohmg.georeference.models import SessionBase - - sessions = SessionBase.objects.filter( - Q(doc2__map_id=self.pk) | Q(reg2__document__map_id=self.pk) - # | Q(lyr2__region__document__map_id=self.pk) - ).prefetch_related() - - prep_sessions = sessions.filter(type="p") - georef_sessions = sessions.filter(type="g") - - prep_ct = prep_sessions.count() - georef_ct = georef_sessions.count() - summary = { - "prep_ct": prep_ct, - "prep_contributors": get_session_user_summary(prep_sessions), - "georef_ct": georef_ct, - "georef_contributors": get_session_user_summary(georef_sessions), - } - return summary - - def save(self, set_slug=False, *args, **kwargs): - if set_slug or not self.slug: - self.slug = slugify(self.title, join_char="_") - - return super(self.__class__, self).save(*args, **kwargs) - - -class Document(models.Model): - """Documents are the individual source files that are directly attached to Maps. - They represent pages in an atlas or even just a single scan of a map.""" - - class Meta: - verbose_name_plural = " Documents" - - title = models.CharField(max_length=200, default="untitled document") - nickname = models.CharField(max_length=200, null=True, blank=True) - slug = models.SlugField(max_length=100) - map = models.ForeignKey(Map, on_delete=models.CASCADE, related_name="documents") - page_number = models.CharField(max_length=10, null=True, blank=True) - prepared = models.BooleanField(default=False) - loading_file = models.BooleanField(default=False) - file = models.FileField( - upload_to="documents", - null=True, - blank=True, - max_length=255, - ) - image_size = ArrayField( - models.IntegerField(), - size=2, - null=True, - blank=True, - ) - thumbnail = models.FileField( - upload_to="thumbnails", - null=True, - blank=True, - max_length=255, - ) - source_url = models.CharField( - max_length=255, - null=True, - blank=True, - help_text="Storing a source_url allows the file to be downloaded at any point after " - "the instance has been created.", - ) - iiif_info = models.JSONField(null=True, blank=True) - load_date = models.DateTimeField(null=True, blank=True) - - def __str__(self): - return self.title - - @property - def layers(self): - return Layer.objects.filter(region_id__in=self.regions.all().values_list("id", flat=True)) - - @property - def lock(self): - from ohmg.georeference.models import SessionLock - - ct = ContentType.objects.get_for_model(self) - locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) - if locks.exists(): - return locks[0] - else: - return None - - def load_file_from_source(self, username, overwrite=False): - log_prefix = f"{self.__str__()} |" - logger.info(f"{log_prefix} start load") - self.loading_file = True - self.save() - - if self.source_url: - src_url = self.source_url - elif self.iiif_info: - src_url = self.iiif_info.replace("info.json", "full/full/0/default.jpg") - elif self.source_url: - src_url = self.source_url - else: - logger.warning(f"{log_prefix} no source_url or iiif_info - cancelling download") - return - - if self.file != "" and not overwrite: - logger.warning(f"{log_prefix} won't overwrite existing file") - return - - src_path = Path(src_url) - tmp_img_dir = Path(settings.CACHE_DIR, "images") - tmp_img_dir.mkdir(exist_ok=True, parents=True) - tmp_path = Path(tmp_img_dir, src_path.name) - - if src_url.startswith("http"): - out_file = download_image(src_url, tmp_path, use_cache=not overwrite) - if out_file is None: - logger.error(f"can't get {src_url} -- skipping") - return - else: - if not tmp_path.exists(): - shutil.copyfile(src_path, tmp_path) - - if not src_url.endswith(".jpg"): - tmp_path = convert_img_format(tmp_path, force=True) - - if not tmp_path.exists(): - logger.error(f"{log_prefix} can't retrieve source: {src_url}. Moving to next Document.") - return - - with open(tmp_path, "rb") as new_file: - self.file.save(f"{self.slug}{tmp_path.suffix}", File(new_file)) - - self.load_date = datetime.now() - self.loading_file = False - if self.map.loaded_by is None: - self.map.loaded_by = get_user_model().objects.get(username=username) - self.map.load_date = self.load_date - self.map.save(update_fields=["loaded_by", "load_date"]) - self.save(set_thumbnail=True) - - def set_thumbnail(self): - if self.file is not None: - if self.thumbnail: - self.thumbnail.delete() - content = generate_document_thumbnail_content(self.file) - tname = f"{Path(self.file.url).stem}-doc-thumb.jpg" - self.thumbnail.save(tname, ContentFile(content)) - - def save( - self, - set_slug: bool = False, - set_thumbnail: bool = False, - set_image_size: bool = False, - skip_map_lookup_update: bool = False, - *args, - **kwargs, - ): - # attach this flag which is checked on the post_save signal receiver - self.skip_map_lookup_update = skip_map_lookup_update - - if set_thumbnail or (self.file and not self.thumbnail): - self.set_thumbnail() - - if set_slug or not self.slug: - title = self.map.__str__() - if self.page_number: - title += f" {self.map.DOCUMENT_PREFIX_ABBREVIATIONS[self.map.document_page_type]}{self.page_number}" - self.slug = slugify(title, join_char="_") - - self.title = self.map.title - if self.page_number: - self.title += f" {self.map.DOCUMENT_PREFIX_ABBREVIATIONS[self.map.document_page_type]}{self.page_number}" - - self.nickname = self.map.title - if self.page_number and self.nickname: - self.nickname = f"{self.map.document_page_type} {self.page_number}" - - if set_image_size or not self.image_size: - self.image_size = get_image_size(self.file) if self.file else None - - if self._state.adding is False: - self.prepared = self.regions.all().count() > 0 - - return super(self.__class__, self).save(*args, **kwargs) - - -class RegionCategory(models.Model): - class Meta: - verbose_name_plural = " Region Categories" - - slug = models.CharField(max_length=50) - description = models.CharField(max_length=200, null=True, blank=True) - display_name = models.CharField(max_length=50) - - def __str__(self): - return self.display_name if self.display_name else self.slug - - -class Region(models.Model): - class Meta: - verbose_name_plural = " Regions" - - title = models.CharField(max_length=200, default="untitled region") - nickname = models.CharField(max_length=200, null=True, blank=True) - slug = models.SlugField(max_length=100) - boundary = models.PolygonField( - null=True, - blank=True, - ) - document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="regions") - division_number = models.IntegerField(null=True, blank=True) - is_map = models.BooleanField(default=True) - category = models.ForeignKey(RegionCategory, on_delete=models.PROTECT, null=True, blank=True) - georeferenced = models.BooleanField(default=False) - skipped = models.BooleanField(default=False) - created_by = models.ForeignKey( - settings.AUTH_USER_MODEL, - blank=True, - null=True, - on_delete=models.SET_NULL, - related_name="region_created_by", - ) - created = models.DateTimeField(auto_now_add=True, null=True, blank=True) - last_updated = models.DateTimeField(auto_now=True, null=True, blank=True) - file = models.FileField( - upload_to="regions", - null=True, - blank=True, - max_length=255, - ) - image_size = ArrayField( - models.IntegerField(), - size=2, - null=True, - blank=True, - ) - thumbnail = models.FileField( - upload_to="thumbnails", - null=True, - blank=True, - max_length=255, - ) - - def __str__(self): - return self.title - - @cached_property - def map(self) -> Map: - return self.document.map - - @property - def transformation(self): - if hasattr(self, "gcpgroup"): - return self.gcpgroup.transformation - else: - return None - - @property - def gcps_geojson(self): - if hasattr(self, "gcpgroup"): - return self.gcpgroup.as_geojson - else: - return None - - @property - def lock(self): - from ohmg.georeference.models import SessionLock - - ct = ContentType.objects.get_for_model(self) - locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) - if locks.exists(): - return locks[0] - else: - return None - - def set_thumbnail(self): - if self.file is not None: - if self.thumbnail: - self.thumbnail.delete() - content = generate_document_thumbnail_content(self.file) - tname = f"{Path(self.file.url).stem}-reg-thumb.jpg" - self.thumbnail.save(tname, ContentFile(content)) - - def save( - self, - set_slug: bool = False, - set_thumbnail: bool = False, - set_image_size: bool = False, - skip_map_lookup_update: bool = False, - *args, - **kwargs, - ): - # attach this flag which is checked on the post_save signal receiver - self.skip_map_lookup_update = skip_map_lookup_update - - if set_thumbnail or (self.file and not self.thumbnail): - self.set_thumbnail() - - if set_slug or not self.slug: - display_name = self.document.__str__() - if self.division_number: - display_name += f" [{self.division_number}]" - self.slug = slugify(display_name, join_char="_") - - self.title = self.document.title - if self.division_number: - self.title += f" [{self.division_number}]" - - self.nickname = self.document.nickname - if self.division_number and self.nickname: - self.nickname += f" [{self.division_number}]" - - if set_image_size or not self.image_size: - self.image_size = get_image_size(self.file) if self.file else None - - return super(self.__class__, self).save(*args, **kwargs) - - -class Layer(models.Model): - class Meta: - verbose_name_plural = " Layers" - - title = models.CharField(max_length=200, default="untitled layer") - nickname = models.CharField(max_length=200, null=True, blank=True) - slug = models.SlugField(max_length=100) - region = models.OneToOneField( - Region, - on_delete=models.CASCADE, - ) - created_by = models.ForeignKey( - settings.AUTH_USER_MODEL, - blank=True, - null=True, - on_delete=models.SET_NULL, - related_name="layers_created", - ) - last_updated_by = models.ForeignKey( - settings.AUTH_USER_MODEL, - blank=True, - null=True, - on_delete=models.SET_NULL, - related_name="layers_updated", - ) - created = models.DateTimeField(auto_now_add=True, null=True, blank=True) - last_updated = models.DateTimeField(auto_now=True, null=True, blank=True) - extent = ArrayField( - models.FloatField(), - size=4, - null=True, - blank=True, - ) - mask = models.PolygonField(blank=True, null=True) - file = models.FileField( - upload_to="layers", - null=True, - blank=True, - max_length=255, - ) - thumbnail = models.FileField( - upload_to="thumbnails", - null=True, - blank=True, - max_length=255, - ) - layerset2 = models.ForeignKey( - "core.LayerSet", - null=True, - blank=True, - on_delete=models.SET_NULL, - ) - tilejson = models.JSONField(null=True, blank=True) - - def __str__(self): - return self.title - - @cached_property - def map(self) -> Map: - return self.region.document.map - - @cached_property - def centroid(self): - return Polygon.from_bbox(self.extent).centroid - - @cached_property - def file_url(self): - return get_file_url(self) - - @cached_property - def file_url_encoded(self): - """return the public url to the mosaic COG for this annotation set. If - no COG exists, return None.""" - return urllib.parse.quote(self.file_url, safe="") - - def create_xyz_url(self) -> Union[str, None]: - file_url = get_file_url(self) - if file_url: - encoded_url = urllib.parse.quote(file_url, safe="") - xyx_base = ( - f"{settings.TITILER_HOST}/cog/tiles/WebMercatorQuad/{{z}}/{{x}}/{{y}}@2x.png?" - ) - return f"{xyx_base}&url={encoded_url}" - else: - return None - - @property - def lock(self): - from ohmg.georeference.models import SessionLock - - ct = ContentType.objects.get_for_model(self) - locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) - if locks.exists(): - return locks[0] - else: - return None - - def set_thumbnail(self): - if self.file is not None: - if self.thumbnail: - self.thumbnail.delete() - content = generate_layer_thumbnail_content(self.file) - tname = f"{Path(self.file.url).stem}-lyr-thumb.jpg" - self.thumbnail.save(tname, ContentFile(content)) - - def set_layerset(self, layerset): - # if it's the same LayerSet then do nothing - if self.layerset2 == layerset: - logger.debug( - f"Layer {self.pk} already in LayerSet {layerset} ({layerset.pk}), no action" - ) - return - - # make sure to clean up the existing multimask in the current vrs if necessary - existing_obj = LayerSet.objects.get(pk=self.layerset2.pk) if self.layerset2 else None - delete_existing = False - if existing_obj: - if existing_obj.multimask and self.slug in existing_obj.multimask: - del existing_obj.multimask[self.slug] - existing_obj.save(update_fields=["multimask"]) - logger.info( - f"Layer {self.pk} removed from existing multimask in LayerSet {existing_obj.pk}" - ) - if existing_obj.get_layers().count() == 1: - delete_existing = True - self.layerset2 = layerset - self.save(update_fields=["layerset2"]) - logger.info(f"Layer {self.pk} added to LayerSet {self.layerset2} ({self.layerset2.pk})") - - if delete_existing: - msg = f"Emptied LayerSet {existing_obj} ({existing_obj.pk}) deleted" - existing_obj.delete() - logger.info(msg) - - # little patch in here to make sure the new Map objects get added to the layerset, - # before everything is shifted away from the Volume model - if not layerset.map: - layerset.map = self.region.document.map - - # save here to trigger a recalculation of the layerset's extent - layerset.save() - - def get_creators(self): - from ohmg.georeference.models import SessionBase - - sessions = SessionBase.objects.filter(Q(doc2=self.region.document) | Q(reg2=self.region)) - user_list = get_session_user_summary(sessions) - return [ - { - "id": f"https://oldinsurancemaps.net/profile/{i['name']}", - "type": "Person", - } - for i in user_list - ] - - def save( - self, - set_slug: bool = False, - set_thumbnail: bool = False, - set_extent: bool = True, - set_tilejson: bool = False, - skip_map_lookup_update: bool = False, - *args, - **kwargs, - ): - # attach this flag which is checked on the post_save signal receiver - self.skip_map_lookup_update = skip_map_lookup_update - - if set_slug or not self.slug: - self.slug = slugify(self.region.__str__(), join_char="_") - - if set_thumbnail or (self.file and not self.thumbnail): - self.set_thumbnail() - - if set_extent and self.file: - self.extent = get_extent_from_file(self.file) - - self.title = self.region.title - self.nickname = self.region.nickname - - if (set_tilejson or self.tilejson is None) and self.file: - self.tilejson = { - "tilejson": "2.2.0", - "version": "1.0.0", - "scheme": "xyz", - "tiles": [self.create_xyz_url()], - "minzoom": 10, - "maxzoom": 21, - "bounds": self.extent, - "center": [self.centroid[0], self.centroid[1], 16], - "attribution": "OldInsuranceMaps; LOC", - } - - return super(self.__class__, self).save(*args, **kwargs) - - -class LayerSetCategory(models.Model): - class Meta: - verbose_name_plural = "Layer Set Categories" - - slug = models.CharField(max_length=50) - description = models.CharField(max_length=200, null=True, blank=True) - display_name = models.CharField(max_length=50) - - def __str__(self): - return self.display_name if self.display_name else self.slug - - -class LayerSet(models.Model): - class Meta: - verbose_name_plural = "Layer Sets" - - map = models.ForeignKey( - Map, - null=True, - blank=True, - on_delete=models.CASCADE, - ) - category = models.ForeignKey( - LayerSetCategory, - on_delete=models.PROTECT, - blank=True, - null=True, - ) - multimask = models.JSONField(null=True, blank=True) - mosaic_geotiff = models.FileField( - upload_to="mosaics", - null=True, - blank=True, - max_length=255, - ) - mosaic_json = models.FileField( - upload_to="mosaics", - null=True, - blank=True, - max_length=255, - ) - extent = ArrayField( - models.FloatField(), - size=4, - null=True, - blank=True, - ) - tilejson = models.JSONField(null=True, blank=True) - - def __str__(self): - return f"{self.map} - {self.category}" - - def layer_display_list(self): - """For display in the admin interface only.""" - li = [ - f"
  • {i}
  • " for i in self.get_layers() - ] - return mark_safe("
      " + "".join(li) + "
    ") - - layer_display_list.short_description = "Layers" - - def get_layers(self) -> Iterable[Layer]: - return self.layer_set.all() - - @cached_property - def centroid(self): - return Polygon.from_bbox(self.extent).centroid - - @property - def mosaic_cog_url(self): - """return the public url to the mosaic COG for this annotation set. If - no COG exists, return None.""" - return get_file_url(self, "mosaic_geotiff") - - @cached_property - def file_url_encoded(self): - """return the public url to the mosaic COG for this annotation set. If - no COG exists, return None.""" - return urllib.parse.quote(self.mosaic_cog_url, safe="") - - def create_xyz_url(self) -> Union[str, None]: - file_url = get_file_url(self, "mosaic_geotiff") - if file_url: - encoded_url = urllib.parse.quote(file_url, safe="") - xyx_base = ( - f"{settings.TITILER_HOST}/cog/tiles/WebMercatorQuad/{{z}}/{{x}}/{{y}}@2x.png?" - ) - return f"{xyx_base}&url={encoded_url}" - else: - return None - - @property - def multimask_extent(self): - """Calculate an extent based on all layers in this layerset's - multimask. If there is no multimask, return None.""" - extent = None - if self.multimask: - feature_polygons = [] - for v in self.multimask.values(): - poly = Polygon(v["geometry"]["coordinates"][0]) - feature_polygons.append(poly) - if len(feature_polygons) > 0: - extent = MultiPolygon(feature_polygons, srid=4326).extent - return extent - - @property - def multimask_geojson(self): - if self.multimask: - multimask_geojson = {"type": "FeatureCollection", "features": []} - for layer, geojson in self.multimask.items(): - geojson["properties"] = {"layer": layer} - multimask_geojson["features"].append(geojson) - return multimask_geojson - else: - return None - - def validate_multimask_geojson(self, multimask_geojson): - errors = [] - for feature in multimask_geojson["features"]: - lyr = feature["properties"]["layer"] - try: - geom_str = json.dumps(feature["geometry"]) - g = GEOSGeometry(geom_str) - if not g.valid: - logger.warning(f"{self} | invalid mask: {lyr} - {g.valid_reason}") - errors.append((lyr, g.valid_reason)) - except Exception as e: - logger.warning(f"{self} | improper GeoJSON in multimask") - errors.append((lyr, e)) - return errors - - def update_multimask_from_geojson(self, multimask_geojson): - errors = self.validate_multimask_geojson(multimask_geojson) - if errors: - return errors - - if multimask_geojson["features"]: - self.multimask = {} - for feature in multimask_geojson["features"]: - layer_slug = feature["properties"]["layer"] - self.multimask[feature["properties"]["layer"]] = feature - - ## future patch: save mask directly to layers - layer = Layer.objects.get(slug=layer_slug, region__document__map=self.map) - layer.mask = GEOSGeometry(json.dumps(feature["geometry"])) - layer.save(skip_map_lookup_update=True) - else: - self.multimask = None - self.save(update_fields=["multimask"]) - - def save(self, set_tilejson: bool = False, *args, **kwargs): - if self._state.adding is False: - extents = self.get_layers().values_list("extent", flat=True) - layer_extents = [] - for extent in extents: - if extent: - poly = Polygon().from_bbox(extent) - layer_extents.append(poly) - if layer_extents: - combined = MultiPolygon(layer_extents) - self.extent = combined.extent - if (set_tilejson or self.tilejson is None) and self.mosaic_geotiff: - self.tilejson = { - "tilejson": "2.2.0", - "version": "1.0.0", - "scheme": "xyz", - "tiles": [self.create_xyz_url()], - "minzoom": 10, - "maxzoom": 21, - "bounds": self.extent, - "center": [self.centroid[0], self.centroid[1], 16], - "attribution": "OldInsuranceMaps; LOC", - } - - return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/models/__init__.py b/ohmg/core/models/__init__.py new file mode 100644 index 00000000..8d1d39a4 --- /dev/null +++ b/ohmg/core/models/__init__.py @@ -0,0 +1,6 @@ +from .document import Document # noqa: F401 +from .layer import Layer # noqa: F401 +from .layerset import LayerSet, LayerSetCategory # noqa: F401 +from .map import Map # noqa: F401 +from .mapgroup import MapGroup # noqa: F401 +from .region import Region, RegionCategory # noqa: F401 diff --git a/ohmg/core/models/document.py b/ohmg/core/models/document.py new file mode 100644 index 00000000..616e4992 --- /dev/null +++ b/ohmg/core/models/document.py @@ -0,0 +1,188 @@ +import logging +import shutil +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.contenttypes.models import ContentType +from django.contrib.gis.db import models +from django.contrib.postgres.fields import ArrayField +from django.core.files import File +from django.core.files.base import ContentFile + +from ..utils import ( + slugify, +) +from ..utils.image import ( + convert_img_format, + generate_document_thumbnail_content, + get_image_size, +) +from ..utils.requests import download_image + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +class Document(models.Model): + """Documents are the individual source files that are directly attached to Maps. + They represent pages in an atlas or even just a single scan of a map.""" + + class Meta: + verbose_name_plural = " Documents" + + title = models.CharField(max_length=200, default="untitled document") + nickname = models.CharField(max_length=200, null=True, blank=True) + slug = models.SlugField(max_length=100) + map = models.ForeignKey("core.Map", on_delete=models.CASCADE, related_name="documents") + page_number = models.CharField(max_length=10, null=True, blank=True) + prepared = models.BooleanField(default=False) + loading_file = models.BooleanField(default=False) + file = models.FileField( + upload_to="documents", + null=True, + blank=True, + max_length=255, + ) + image_size = ArrayField( + models.IntegerField(), + size=2, + null=True, + blank=True, + ) + thumbnail = models.FileField( + upload_to="thumbnails", + null=True, + blank=True, + max_length=255, + ) + source_url = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="Storing a source_url allows the file to be downloaded at any point after " + "the instance has been created.", + ) + iiif_info = models.JSONField(null=True, blank=True) + load_date = models.DateTimeField(null=True, blank=True) + + def __str__(self): + return self.title + + @property + def layers(self): + from .layer import Layer + + return Layer.objects.filter(region_id__in=self.regions.all().values_list("id", flat=True)) + + @property + def lock(self): + from ohmg.georeference.models import SessionLock + + ct = ContentType.objects.get_for_model(self) + locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) + if locks.exists(): + return locks[0] + else: + return None + + def load_file_from_source(self, username, overwrite=False): + log_prefix = f"{self.__str__()} |" + logger.info(f"{log_prefix} start load") + self.loading_file = True + self.save() + + if self.source_url: + src_url = self.source_url + elif self.iiif_info: + src_url = self.iiif_info.replace("info.json", "full/full/0/default.jpg") + elif self.source_url: + src_url = self.source_url + else: + logger.warning(f"{log_prefix} no source_url or iiif_info - cancelling download") + return + + if self.file != "" and not overwrite: + logger.warning(f"{log_prefix} won't overwrite existing file") + return + + src_path = Path(src_url) + tmp_img_dir = Path(settings.CACHE_DIR, "images") + tmp_img_dir.mkdir(exist_ok=True, parents=True) + tmp_path = Path(tmp_img_dir, src_path.name) + + if src_url.startswith("http"): + out_file = download_image(src_url, tmp_path, use_cache=not overwrite) + if out_file is None: + logger.error(f"can't get {src_url} -- skipping") + return + else: + if not tmp_path.exists(): + shutil.copyfile(src_path, tmp_path) + + if not src_url.endswith(".jpg"): + tmp_path = convert_img_format(tmp_path, force=True) + + if not tmp_path.exists(): + logger.error(f"{log_prefix} can't retrieve source: {src_url}. Moving to next Document.") + return + + with open(tmp_path, "rb") as new_file: + self.file.save(f"{self.slug}{tmp_path.suffix}", File(new_file)) + + self.load_date = datetime.now() + self.loading_file = False + if self.map.loaded_by is None: + self.map.loaded_by = get_user_model().objects.get(username=username) + self.map.load_date = self.load_date + self.map.save(update_fields=["loaded_by", "load_date"]) + self.save(set_thumbnail=True) + + def set_thumbnail(self): + if self.file is not None: + if self.thumbnail: + self.thumbnail.delete() + content = generate_document_thumbnail_content(self.file) + tname = f"{Path(self.file.url).stem}-doc-thumb.jpg" + self.thumbnail.save(tname, ContentFile(content)) + + def save( + self, + set_slug: bool = False, + set_thumbnail: bool = False, + set_image_size: bool = False, + skip_map_lookup_update: bool = False, + *args, + **kwargs, + ): + # attach this flag which is checked on the post_save signal receiver + self.skip_map_lookup_update = skip_map_lookup_update + + if set_thumbnail or (self.file and not self.thumbnail): + self.set_thumbnail() + + if set_slug or not self.slug: + title = self.map.__str__() + if self.page_number: + title += f" {self.map.DOCUMENT_PREFIX_ABBREVIATIONS[self.map.document_page_type]}{self.page_number}" + self.slug = slugify(title, join_char="_") + + self.title = self.map.title + if self.page_number: + self.title += f" {self.map.DOCUMENT_PREFIX_ABBREVIATIONS[self.map.document_page_type]}{self.page_number}" + + self.nickname = self.map.title + if self.page_number and self.nickname: + self.nickname = f"{self.map.document_page_type} {self.page_number}" + + if set_image_size or not self.image_size: + self.image_size = get_image_size(self.file) if self.file else None + + if self._state.adding is False: + self.prepared = self.regions.all().count() > 0 + + return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/models/layer.py b/ohmg/core/models/layer.py new file mode 100644 index 00000000..cc7ca4df --- /dev/null +++ b/ohmg/core/models/layer.py @@ -0,0 +1,226 @@ +import logging +import urllib.parse +from pathlib import Path +from typing import TYPE_CHECKING, Union + +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.contrib.gis.db import models +from django.contrib.gis.geos import Polygon +from django.contrib.postgres.fields import ArrayField +from django.core.files.base import ContentFile +from django.db.models import Q +from django.utils.functional import cached_property + +from ..storages import get_file_url +from ..utils import ( + get_session_user_summary, + slugify, +) +from ..utils.image import ( + generate_layer_thumbnail_content, + get_extent_from_file, +) + +if TYPE_CHECKING: + from .map import Map + +logger = logging.getLogger(__name__) + + +class Layer(models.Model): + class Meta: + verbose_name_plural = " Layers" + + title = models.CharField(max_length=200, default="untitled layer") + nickname = models.CharField(max_length=200, null=True, blank=True) + slug = models.SlugField(max_length=100) + region = models.OneToOneField( + "core.Region", + on_delete=models.CASCADE, + ) + created_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + blank=True, + null=True, + on_delete=models.SET_NULL, + related_name="layers_created", + ) + last_updated_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + blank=True, + null=True, + on_delete=models.SET_NULL, + related_name="layers_updated", + ) + created = models.DateTimeField(auto_now_add=True, null=True, blank=True) + last_updated = models.DateTimeField(auto_now=True, null=True, blank=True) + extent = ArrayField( + models.FloatField(), + size=4, + null=True, + blank=True, + ) + mask = models.PolygonField(blank=True, null=True) + file = models.FileField( + upload_to="layers", + null=True, + blank=True, + max_length=255, + ) + thumbnail = models.FileField( + upload_to="thumbnails", + null=True, + blank=True, + max_length=255, + ) + layerset2 = models.ForeignKey( + "core.LayerSet", + null=True, + blank=True, + on_delete=models.SET_NULL, + ) + tilejson = models.JSONField(null=True, blank=True) + + def __str__(self): + return self.title + + @cached_property + def map(self) -> "Map": + return self.region.document.map + + @cached_property + def centroid(self): + return Polygon.from_bbox(self.extent).centroid + + @cached_property + def file_url(self): + return get_file_url(self) + + @cached_property + def file_url_encoded(self): + """return the public url to the mosaic COG for this annotation set. If + no COG exists, return None.""" + return urllib.parse.quote(self.file_url, safe="") + + def create_xyz_url(self) -> Union[str, None]: + file_url = get_file_url(self) + if file_url: + encoded_url = urllib.parse.quote(file_url, safe="") + xyx_base = ( + f"{settings.TITILER_HOST}/cog/tiles/WebMercatorQuad/{{z}}/{{x}}/{{y}}@2x.png?" + ) + return f"{xyx_base}&url={encoded_url}" + else: + return None + + @property + def lock(self): + from ohmg.georeference.models import SessionLock + + ct = ContentType.objects.get_for_model(self) + locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) + if locks.exists(): + return locks[0] + else: + return None + + def set_thumbnail(self): + if self.file is not None: + if self.thumbnail: + self.thumbnail.delete() + content = generate_layer_thumbnail_content(self.file) + tname = f"{Path(self.file.url).stem}-lyr-thumb.jpg" + self.thumbnail.save(tname, ContentFile(content)) + + def set_layerset(self, layerset): + from .layerset import LayerSet + + # if it's the same LayerSet then do nothing + if self.layerset2 == layerset: + logger.debug( + f"Layer {self.pk} already in LayerSet {layerset} ({layerset.pk}), no action" + ) + return + + # make sure to clean up the existing multimask in the current vrs if necessary + existing_obj = LayerSet.objects.get(pk=self.layerset2.pk) if self.layerset2 else None + delete_existing = False + if existing_obj: + if existing_obj.multimask and self.slug in existing_obj.multimask: + del existing_obj.multimask[self.slug] + existing_obj.save(update_fields=["multimask"]) + logger.info( + f"Layer {self.pk} removed from existing multimask in LayerSet {existing_obj.pk}" + ) + if existing_obj.get_layers().count() == 1: + delete_existing = True + self.layerset2 = layerset + self.save(update_fields=["layerset2"]) + logger.info(f"Layer {self.pk} added to LayerSet {self.layerset2} ({self.layerset2.pk})") + + if delete_existing: + msg = f"Emptied LayerSet {existing_obj} ({existing_obj.pk}) deleted" + existing_obj.delete() + logger.info(msg) + + # little patch in here to make sure the new Map objects get added to the layerset, + # before everything is shifted away from the Volume model + if not layerset.map: + layerset.map = self.region.document.map + + # save here to trigger a recalculation of the layerset's extent + layerset.save() + + def get_creators(self): + from ohmg.georeference.models import SessionBase + + sessions = SessionBase.objects.filter(Q(doc2=self.region.document) | Q(reg2=self.region)) + user_list = get_session_user_summary(sessions) + return [ + { + "id": f"https://oldinsurancemaps.net/profile/{i['name']}", + "type": "Person", + } + for i in user_list + ] + + def save( + self, + set_slug: bool = False, + set_thumbnail: bool = False, + set_extent: bool = True, + set_tilejson: bool = False, + skip_map_lookup_update: bool = False, + *args, + **kwargs, + ): + # attach this flag which is checked on the post_save signal receiver + self.skip_map_lookup_update = skip_map_lookup_update + + if set_slug or not self.slug: + self.slug = slugify(self.region.__str__(), join_char="_") + + if set_thumbnail or (self.file and not self.thumbnail): + self.set_thumbnail() + + if set_extent and self.file: + self.extent = get_extent_from_file(self.file) + + self.title = self.region.title + self.nickname = self.region.nickname + + if (set_tilejson or self.tilejson is None) and self.file: + self.tilejson = { + "tilejson": "2.2.0", + "version": "1.0.0", + "scheme": "xyz", + "tiles": [self.create_xyz_url()], + "minzoom": 10, + "maxzoom": 21, + "bounds": self.extent, + "center": [self.centroid[0], self.centroid[1], 16], + "attribution": "OldInsuranceMaps; LOC", + } + + return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/models/layerset.py b/ohmg/core/models/layerset.py new file mode 100644 index 00000000..d9b224ce --- /dev/null +++ b/ohmg/core/models/layerset.py @@ -0,0 +1,198 @@ +import json +import logging +import urllib.parse +from typing import TYPE_CHECKING, Iterable, Union + +from django.conf import settings +from django.contrib.gis.db import models +from django.contrib.gis.geos import GEOSGeometry, MultiPolygon, Polygon +from django.contrib.postgres.fields import ArrayField +from django.utils.functional import cached_property +from django.utils.safestring import mark_safe + +from ..storages import get_file_url + +if TYPE_CHECKING: + from .layer import Layer +# from .map import Map + +logger = logging.getLogger(__name__) + + +class LayerSetCategory(models.Model): + class Meta: + verbose_name_plural = "Layer Set Categories" + + slug = models.CharField(max_length=50) + description = models.CharField(max_length=200, null=True, blank=True) + display_name = models.CharField(max_length=50) + + def __str__(self): + return self.display_name if self.display_name else self.slug + + +class LayerSet(models.Model): + class Meta: + verbose_name_plural = "Layer Sets" + + map = models.ForeignKey( + "core.Map", + null=True, + blank=True, + on_delete=models.CASCADE, + ) + category = models.ForeignKey( + LayerSetCategory, + on_delete=models.PROTECT, + blank=True, + null=True, + ) + multimask = models.JSONField(null=True, blank=True) + mosaic_geotiff = models.FileField( + upload_to="mosaics", + null=True, + blank=True, + max_length=255, + ) + mosaic_json = models.FileField( + upload_to="mosaics", + null=True, + blank=True, + max_length=255, + ) + extent = ArrayField( + models.FloatField(), + size=4, + null=True, + blank=True, + ) + tilejson = models.JSONField(null=True, blank=True) + + def __str__(self): + return f"{self.map} - {self.category}" + + def layer_display_list(self): + """For display in the admin interface only.""" + li = [ + f"
  • {i}
  • " for i in self.get_layers() + ] + return mark_safe("
      " + "".join(li) + "
    ") + + layer_display_list.short_description = "Layers" + + def get_layers(self) -> Iterable["Layer"]: + return self.layer_set.all() + + @cached_property + def centroid(self): + return Polygon.from_bbox(self.extent).centroid + + @property + def mosaic_cog_url(self): + """return the public url to the mosaic COG for this annotation set. If + no COG exists, return None.""" + return get_file_url(self, "mosaic_geotiff") + + @cached_property + def file_url_encoded(self): + """return the public url to the mosaic COG for this annotation set. If + no COG exists, return None.""" + return urllib.parse.quote(self.mosaic_cog_url, safe="") + + def create_xyz_url(self) -> Union[str, None]: + file_url = get_file_url(self, "mosaic_geotiff") + if file_url: + encoded_url = urllib.parse.quote(file_url, safe="") + xyx_base = ( + f"{settings.TITILER_HOST}/cog/tiles/WebMercatorQuad/{{z}}/{{x}}/{{y}}@2x.png?" + ) + return f"{xyx_base}&url={encoded_url}" + else: + return None + + @property + def multimask_extent(self): + """Calculate an extent based on all layers in this layerset's + multimask. If there is no multimask, return None.""" + extent = None + if self.multimask: + feature_polygons = [] + for v in self.multimask.values(): + poly = Polygon(v["geometry"]["coordinates"][0]) + feature_polygons.append(poly) + if len(feature_polygons) > 0: + extent = MultiPolygon(feature_polygons, srid=4326).extent + return extent + + @property + def multimask_geojson(self): + if self.multimask: + multimask_geojson = {"type": "FeatureCollection", "features": []} + for layer, geojson in self.multimask.items(): + geojson["properties"] = {"layer": layer} + multimask_geojson["features"].append(geojson) + return multimask_geojson + else: + return None + + def validate_multimask_geojson(self, multimask_geojson): + errors = [] + for feature in multimask_geojson["features"]: + lyr = feature["properties"]["layer"] + try: + geom_str = json.dumps(feature["geometry"]) + g = GEOSGeometry(geom_str) + if not g.valid: + logger.warning(f"{self} | invalid mask: {lyr} - {g.valid_reason}") + errors.append((lyr, g.valid_reason)) + except Exception as e: + logger.warning(f"{self} | improper GeoJSON in multimask") + errors.append((lyr, e)) + return errors + + def update_multimask_from_geojson(self, multimask_geojson): + from .layer import Layer + + errors = self.validate_multimask_geojson(multimask_geojson) + if errors: + return errors + + if multimask_geojson["features"]: + self.multimask = {} + for feature in multimask_geojson["features"]: + layer_slug = feature["properties"]["layer"] + self.multimask[feature["properties"]["layer"]] = feature + + ## future patch: save mask directly to layers + layer = Layer.objects.get(slug=layer_slug, region__document__map=self.map) + layer.mask = GEOSGeometry(json.dumps(feature["geometry"])) + layer.save(skip_map_lookup_update=True) + else: + self.multimask = None + self.save(update_fields=["multimask"]) + + def save(self, set_tilejson: bool = False, *args, **kwargs): + if self._state.adding is False: + extents = self.get_layers().values_list("extent", flat=True) + layer_extents = [] + for extent in extents: + if extent: + poly = Polygon().from_bbox(extent) + layer_extents.append(poly) + if layer_extents: + combined = MultiPolygon(layer_extents) + self.extent = combined.extent + if (set_tilejson or self.tilejson is None) and self.mosaic_geotiff: + self.tilejson = { + "tilejson": "2.2.0", + "version": "1.0.0", + "scheme": "xyz", + "tiles": [self.create_xyz_url()], + "minzoom": 10, + "maxzoom": 21, + "bounds": self.extent, + "center": [self.centroid[0], self.centroid[1], 16], + "attribution": "OldInsuranceMaps; LOC", + } + + return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/models/map.py b/ohmg/core/models/map.py new file mode 100644 index 00000000..7b7e4385 --- /dev/null +++ b/ohmg/core/models/map.py @@ -0,0 +1,454 @@ +import logging + +from django.conf import settings +from django.contrib.auth.models import Group +from django.contrib.gis.db import models +from django.contrib.gis.geos import MultiPolygon, Polygon +from django.db import transaction +from django.db.models import Q +from natsort import natsorted + +from ohmg.places.models import Place + +from ..utils import ( + MONTH_CHOICES, + get_session_user_summary, + slugify, +) + +logger = logging.getLogger(__name__) + + +class Map(models.Model): + class Meta: + verbose_name_plural = " Maps" + + ACCESS_CHOICES = ( + ("none", "none"), + ("sponsor", "sponsor"), + ("any", "any"), + ) + ACCESS_LEVEL_CHOICES = ( + ("public", "Public"), + ("restricted", "Restricted"), + ("none", "None"), + ) + + DOCUMENT_PREFIX_CHOICES = ( + ("page", "page"), + ("sheet", "sheet"), + ("plate", "plate"), + ("part", "part"), + ) + + DOCUMENT_PREFIX_ABBREVIATIONS = { + "page": "p", + "sheet": "s", + "plate": "pl", + "part": "pt", + } + + identifier = models.CharField(max_length=100, primary_key=True) + slug = models.SlugField(max_length=100) + title = models.CharField(max_length=200) + year = models.IntegerField(blank=True, null=True) + month = models.IntegerField(blank=True, null=True, choices=MONTH_CHOICES) + creator = models.CharField( + max_length=200, + null=True, + blank=True, + ) + publisher = models.CharField( + max_length=200, + null=True, + blank=True, + ) + volume_number = models.CharField( + max_length=25, + null=True, + blank=True, + help_text="Volume number (or name?), if this map is included in a MapGroup.", + ) + document_page_type = models.CharField( + max_length=10, + choices=DOCUMENT_PREFIX_CHOICES, + null=True, + blank=True, + help_text="The preferred term for referring to documents within this map.", + ) + iiif_manifest = models.JSONField(null=True, blank=True) + create_date = models.DateTimeField(auto_now_add=True) + load_date = models.DateTimeField(null=True, blank=True) + loading_documents = models.BooleanField( + default=False, + help_text="true only when document files are being loaded for this map", + ) + document_sources = models.JSONField( + null=True, + blank=True, + default=dict, + ) + item_lookup = models.JSONField( + null=True, + blank=True, + default=dict, + ) + featured = models.BooleanField(default=False, help_text="show in featured section") + hidden = models.BooleanField( + default=False, + help_text="this map will be excluded from api calls (but url available directly)", + ) + locales = models.ManyToManyField( + Place, + blank=True, + ) + mapgroup = models.ForeignKey( + "core.MapGroup", + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="maps", + ) + access = models.CharField(max_length=50, choices=ACCESS_CHOICES, default="any") + access_level = models.CharField(max_length=50, choices=ACCESS_LEVEL_CHOICES, default="any") + user_access = models.ManyToManyField( + settings.AUTH_USER_MODEL, + blank=True, + related_name="maps_allowed", + ) + group_access = models.ManyToManyField( + Group, + blank=True, + related_name="maps_allowed", + ) + sponsor = models.ForeignKey( + settings.AUTH_USER_MODEL, + blank=True, + null=True, + on_delete=models.SET_NULL, + related_name="maps_sponsored", + ) + loaded_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + blank=True, + null=True, + on_delete=models.CASCADE, + related_name="maps_loaded", + ) + document_ct = models.IntegerField( + default=0, + ) + unprepared_ct = models.IntegerField( + default=0, + ) + region_ct = models.IntegerField( + default=0, + ) + prepared_ct = models.IntegerField( + default=0, + ) + layer_ct = models.IntegerField( + default=0, + ) + skip_ct = models.IntegerField( + default=0, + ) + nonmap_ct = models.IntegerField( + default=0, + ) + completion_pct = models.IntegerField( + default=0, + ) + multimask_ct = models.IntegerField( + default=0, + ) + multimask_rank = models.DecimalField( + max_digits=7, + decimal_places=6, + default=0.000000, + ) + + def __str__(self): + return self.title + + @property + def regions(self): + from .region import Region + + return Region.objects.filter(document__in=self.documents.all()).order_by("title") + + @property + def layers(self): + from .layer import Layer + + return Layer.objects.filter(region__in=self.regions).order_by("title") + + @property + def extent(self): + layerset_extents = [] + for ls in self.layerset_set.all(): + if ls.extent: + poly = Polygon().from_bbox(ls.extent) + layerset_extents.append(poly) + if layerset_extents: + return MultiPolygon(layerset_extents).extent + else: + return None + + @property + def gt_exists(self): + return ( + True + if self.get_layerset("main-content") + and self.get_layerset("main-content").mosaic_geotiff + else False + ) + + @property + def mj_exists(self): + return ( + True + if self.get_layerset("main-content") and self.get_layerset("main-content").mosaic_json + else False + ) + + @property + def stats(self): + unprep_ct = len(self.item_lookup["unprepared"]) + prep_ct = len(self.item_lookup["prepared"]) + georef_ct = len(self.item_lookup["georeferenced"]) + skipped_ct = len(self.item_lookup["skipped"]) + percent = 0 + if georef_ct > 0: + percent = int((georef_ct / (unprep_ct + prep_ct + georef_ct)) * 100) + + main_layerset = self.get_layerset("main-content") + if main_layerset: + main_lyrs_ct = main_layerset.get_layers().count() + else: + main_lyrs_ct = 0 + mm_ct, mm_todo, mm_percent = 0, 0, 0 + if main_lyrs_ct != 0: + # make sure 0/0 appears at the very bottom, then 0/1, 0/2, etc. + mm_percent = main_lyrs_ct * 0.000001 + mm_display = f"0/{main_lyrs_ct}" + if main_layerset and main_layerset.multimask is not None: + mm_ct = len(main_layerset.multimask) + mm_todo = main_lyrs_ct - mm_ct + if mm_ct > 0 and main_lyrs_ct > 0: + mm_display = f"{mm_ct}/{main_lyrs_ct}" + mm_percent = mm_ct / main_lyrs_ct + mm_percent += main_lyrs_ct * 0.000001 + + return { + "unprepared_ct": unprep_ct, + "prepared_ct": prep_ct, + "georeferenced_ct": georef_ct, + "skipped_ct": skipped_ct, + "percent": percent, + "mm_ct": mm_todo, + "mm_display": mm_display, + "mm_percent": mm_percent, + } + + def get_locale(self, serialized=False): + """Returns the first locale in the list of related locales. + This is a patch in use until the frontend is ready for multiple + locales per item.""" + if len(self.locales.all()) > 0: + locale = self.locales.all()[0] + if serialized: + return locale.serialize() + else: + return locale + else: + return None + + def get_layerset(self, cat_slug: str, create: bool = False): + from .layerset import LayerSet, LayerSetCategory + + try: + layerset = LayerSet.objects.get(map=self, category__slug=cat_slug) + except LayerSet.DoesNotExist: + if create: + category = LayerSetCategory.objects.get(slug=cat_slug) + layerset = LayerSet.objects.create(map=self, category=category) + logger.debug(f"created new LayerSet: {self.pk} - {cat_slug}") + else: + layerset = None + return layerset + + def create_documents(self): + """Iterates the list of items in self.document_sources and create Document + objects for each one. If get_files=True, load files from their path. + + A document source entry should look like: + { + path: + iiif_info: + page_number: + } + """ + from .document import Document + + for source in self.document_sources: + document, created = Document.objects.get_or_create( + map=self, + page_number=source["page_number"], + ) + document.source_url = source["path"] + document.iiif_info = source["iiif_info"] + document.save(skip_map_lookup_update=True) + if created: + logger.debug(f"{document} ({document.pk}) created.") + logger.debug(f"Map {self.title} ({self.pk}) has {len(self.documents.all())} Documents") + self.update_item_lookup() + + def load_all_document_files(self, username, overwrite=False): + self.loading_documents = True + self.save() + for document in natsorted(self.documents.all(), key=lambda k: k.title): + if not document.file and not overwrite: + try: + document.load_file_from_source(username, overwrite=True) + except Exception as e: + logger.error(f"error loading document {document.pk}: {e}") + document.loading_file = False + document.save() + self.loading_documents = False + self.save() + + def remove_sheets(self): + for document in self.documents: + document.delete() + + def update_place_counts(self): + locale = self.get_locale() + if locale is not None: + with transaction.atomic(): + locale.volume_count += 1 + locale.volume_count_inclusive += 1 + locale.save(update_fields=["volume_count", "volume_count_inclusive"]) + parents = locale.direct_parents.all() + while parents: + new_parents = [] + for p in parents: + p.volume_count_inclusive += 1 + p.save(update_fields=["volume_count_inclusive"]) + new_parents += list(p.direct_parents.all()) + parents = new_parents + + def get_absolute_url(self): + return f"/map/{self.pk}/" + + def update_item_lookup(self): + from ohmg.api.schemas import DocumentSchema, LayerSchema, RegionSchema + + regions = self.regions + items = { + "unprepared": [ + DocumentSchema.from_orm(i).dict() for i in self.documents.filter(prepared=False) + ], + "prepared": [ + RegionSchema.from_orm(i).dict() + for i in regions.filter(georeferenced=False, category__slug="map").exclude( + skipped=True + ) + ], + "georeferenced": [LayerSchema.from_orm(i).dict() for i in self.layers], + "nonmaps": [ + RegionSchema.from_orm(i).dict() for i in regions.exclude(category__slug="map") + ], + "skipped": [RegionSchema.from_orm(i).dict() for i in regions.filter(skipped=True)], + "processing": { + "unprep": 0, + "prep": 0, + "geo_trim": 0, + }, + } + for cat in ["unprepared", "prepared", "georeferenced", "nonmaps", "skipped"]: + items[cat] = natsorted(items[cat], key=lambda k: k["title"]) + self.item_lookup = items + + document_ct = self.documents.all().count() + unprepared_ct = len(items["unprepared"]) + region_ct = self.regions.count() + prepared_ct = len(items["prepared"]) + layer_ct = self.layers.count() + skip_ct = len(items["skipped"]) + nonmap_ct = len(items["nonmaps"]) + + completion_pct = 0 + if layer_ct > 0: + completion_pct = int((layer_ct / (unprepared_ct + prepared_ct + layer_ct)) * 100) + + multimask_ct, multimask_rank = 0, 0 + main_layerset = self.get_layerset("main-content") + if main_layerset: + main_lyrs_ct = main_layerset.get_layers().count() + else: + main_lyrs_ct = 0 + + if main_lyrs_ct != 0: + # make sure 0/0 appears at the very bottom, then 0/1, 0/2, etc. + multimask_rank = main_lyrs_ct * 0.000001 + + if main_layerset and main_layerset.multimask is not None: + multimask_ct = len(main_layerset.multimask) + if multimask_ct > 0 and main_lyrs_ct > 0: + pct = multimask_ct / main_lyrs_ct + multimask_rank += pct * 0.000001 + + self.document_ct = document_ct + self.unprepared_ct = unprepared_ct + self.region_ct = region_ct + self.prepared_ct = prepared_ct + self.layer_ct = layer_ct + self.skip_ct = skip_ct + self.nonmap_ct = nonmap_ct + self.completion_pct = completion_pct + self.multimask_ct = multimask_ct + self.multimask_rank = multimask_rank + + self.save( + update_fields=[ + "item_lookup", + "document_ct", + "unprepared_ct", + "region_ct", + "prepared_ct", + "layer_ct", + "skip_ct", + "nonmap_ct", + "completion_pct", + "multimask_ct", + "multimask_rank", + ] + ) + + def get_session_summary(self): + from ohmg.georeference.models import SessionBase + + sessions = SessionBase.objects.filter( + Q(doc2__map_id=self.pk) | Q(reg2__document__map_id=self.pk) + # | Q(lyr2__region__document__map_id=self.pk) + ).prefetch_related() + + prep_sessions = sessions.filter(type="p") + georef_sessions = sessions.filter(type="g") + + prep_ct = prep_sessions.count() + georef_ct = georef_sessions.count() + summary = { + "prep_ct": prep_ct, + "prep_contributors": get_session_user_summary(prep_sessions), + "georef_ct": georef_ct, + "georef_contributors": get_session_user_summary(georef_sessions), + } + return summary + + def save(self, set_slug=False, *args, **kwargs): + if set_slug or not self.slug: + self.slug = slugify(self.title, join_char="_") + + return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/models/mapgroup.py b/ohmg/core/models/mapgroup.py new file mode 100644 index 00000000..dd76b384 --- /dev/null +++ b/ohmg/core/models/mapgroup.py @@ -0,0 +1,33 @@ +from django.db import models + + +class MapGroup(models.Model): + class Meta: + verbose_name_plural = " Map Groups" + + MAP_PREFIX_CHOICES = ( + ("volume", "volume"), + ("part", "part"), + ) + + MAP_PREFIX_ABBREVIATIONS = { + "volume": "Vol.", + "part": "Pt.", + } + + title = models.CharField(max_length=200) + slug = models.SlugField(max_length=100) + year_start = models.IntegerField(blank=True, null=True) + year_end = models.IntegerField(blank=True, null=True) + creator = models.CharField(max_length=200) + publisher = models.CharField(max_length=200) + map_prefix = models.CharField( + max_length=10, + choices=MAP_PREFIX_CHOICES, + null=True, + blank=True, + help_text="The preferred term for referring to maps within this map group.", + ) + + def __str__(self): + return self.title diff --git a/ohmg/core/models/region.py b/ohmg/core/models/region.py new file mode 100644 index 00000000..743f2c96 --- /dev/null +++ b/ohmg/core/models/region.py @@ -0,0 +1,155 @@ +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from django.conf import settings +from django.contrib.contenttypes.models import ContentType +from django.contrib.gis.db import models +from django.contrib.postgres.fields import ArrayField +from django.core.files.base import ContentFile +from django.utils.functional import cached_property + +from ..utils import ( + slugify, +) +from ..utils.image import ( + generate_document_thumbnail_content, + get_image_size, +) + +if TYPE_CHECKING: + from .map import Map + +logger = logging.getLogger(__name__) + + +class RegionCategory(models.Model): + class Meta: + verbose_name_plural = " Region Categories" + + slug = models.CharField(max_length=50) + description = models.CharField(max_length=200, null=True, blank=True) + display_name = models.CharField(max_length=50) + + def __str__(self): + return self.display_name if self.display_name else self.slug + + +class Region(models.Model): + class Meta: + verbose_name_plural = " Regions" + + title = models.CharField(max_length=200, default="untitled region") + nickname = models.CharField(max_length=200, null=True, blank=True) + slug = models.SlugField(max_length=100) + boundary = models.PolygonField( + null=True, + blank=True, + ) + document = models.ForeignKey("core.Document", on_delete=models.CASCADE, related_name="regions") + division_number = models.IntegerField(null=True, blank=True) + is_map = models.BooleanField(default=True) + category = models.ForeignKey(RegionCategory, on_delete=models.PROTECT, null=True, blank=True) + georeferenced = models.BooleanField(default=False) + skipped = models.BooleanField(default=False) + created_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + blank=True, + null=True, + on_delete=models.SET_NULL, + related_name="region_created_by", + ) + created = models.DateTimeField(auto_now_add=True, null=True, blank=True) + last_updated = models.DateTimeField(auto_now=True, null=True, blank=True) + file = models.FileField( + upload_to="regions", + null=True, + blank=True, + max_length=255, + ) + image_size = ArrayField( + models.IntegerField(), + size=2, + null=True, + blank=True, + ) + thumbnail = models.FileField( + upload_to="thumbnails", + null=True, + blank=True, + max_length=255, + ) + + def __str__(self): + return self.title + + @cached_property + def map(self) -> "Map": + return self.document.map + + @property + def transformation(self): + if hasattr(self, "gcpgroup"): + return self.gcpgroup.transformation + else: + return None + + @property + def gcps_geojson(self): + if hasattr(self, "gcpgroup"): + return self.gcpgroup.as_geojson + else: + return None + + @property + def lock(self): + from ohmg.georeference.models import SessionLock + + ct = ContentType.objects.get_for_model(self) + locks = SessionLock.objects.filter(target_type=ct, target_id=self.pk) + if locks.exists(): + return locks[0] + else: + return None + + def set_thumbnail(self): + if self.file is not None: + if self.thumbnail: + self.thumbnail.delete() + content = generate_document_thumbnail_content(self.file) + tname = f"{Path(self.file.url).stem}-reg-thumb.jpg" + self.thumbnail.save(tname, ContentFile(content)) + + def save( + self, + set_slug: bool = False, + set_thumbnail: bool = False, + set_image_size: bool = False, + skip_map_lookup_update: bool = False, + *args, + **kwargs, + ): + # attach this flag which is checked on the post_save signal receiver + self.skip_map_lookup_update = skip_map_lookup_update + + if set_thumbnail or (self.file and not self.thumbnail): + self.set_thumbnail() + + if set_slug or not self.slug: + display_name = self.document.__str__() + if self.division_number: + display_name += f" [{self.division_number}]" + self.slug = slugify(display_name, join_char="_") + + self.title = self.document.title + if self.division_number: + self.title += f" [{self.division_number}]" + + self.nickname = self.document.nickname + if self.division_number and self.nickname: + self.nickname += f" [{self.division_number}]" + + if set_image_size or not self.image_size: + self.image_size = get_image_size(self.file) if self.file else None + + return super(self.__class__, self).save(*args, **kwargs) diff --git a/ohmg/core/utils/__init__.py b/ohmg/core/utils/__init__.py index 4948bb83..6401abe0 100644 --- a/ohmg/core/utils/__init__.py +++ b/ohmg/core/utils/__init__.py @@ -212,3 +212,18 @@ def random_alnum(size=6): STATE_NAMES = [i[0] for i in STATE_CHOICES] STATE_LOOKUP = {i[1]: i[0] for i in STATE_CHOICES} STATE_POSTAL_LOOKUP = {v: k for k, v in STATE_POSTAL.items()} + + +def get_session_user_summary(session_list): + users = session_list.values_list("user__username", flat=True) + user_dict = {} + for name in users: + user_dict[name] = user_dict.get( + name, + { + "ct": 0, + "name": name, + }, + ) + user_dict[name]["ct"] += 1 + return sorted(user_dict.values(), key=lambda item: item.get("ct"), reverse=True)