Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions google/cloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None)
integer (8 bytes).

:type timestamp: :class:`datetime.datetime`
:param timestamp: (Optional) The timestamp of the operation.
:param timestamp: (Optional) The timestamp of the operation. If a
timestamp is not provided, the current system time
will be used.

:type state: bool
:param state: (Optional) The state that is passed along to
:meth:`_get_mutations`.
"""
if timestamp is None:
# Use current Bigtable server time.
timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP
if timestamp is None or timestamp == mutations._SERVER_SIDE_TIMESTAMP:
# Preserve special-case values (client side timestamp generation or server side timestamp)
timestamp_micros = timestamp
Comment on lines 164 to 166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for handling the timestamp argument is a bit condensed and could be made more explicit for better readability and maintainability. Expanding the if condition to handle the None and _SERVER_SIDE_TIMESTAMP cases separately would make the intent clearer.

Suggested change
if timestamp is None or timestamp == mutations._SERVER_SIDE_TIMESTAMP:
timestamp_micros = timestamp
if timestamp is None:
# Let SetCell handle client-side timestamp generation by passing None.
timestamp_micros = None
elif timestamp == mutations._SERVER_SIDE_TIMESTAMP:
# Use server-side timestamp.
timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of agree with this, it's easier to skim. But I also think the current code is totally fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to improve readability would just be to add a comment:

if timestamp is None or timestamp == mutations._SERVER_SIDE_TIMESTAMP:
    # preserve special-case values (client-side or server-side timestamp)
    timestamp_micros = timestamp

else:
timestamp_micros = _microseconds_from_datetime(timestamp)
# Truncate to millisecond granularity.
Expand Down Expand Up @@ -351,7 +353,9 @@ def set_cell(self, column_family_id, column, value, timestamp=None):
integer (8 bytes).

:type timestamp: :class:`datetime.datetime`
:param timestamp: (Optional) The timestamp of the operation.
:param timestamp: (Optional) The timestamp of the operation. If a
timestamp is not provided, the current system time
will be used.
"""
self._set_cell(column_family_id, column, value, timestamp=timestamp, state=None)

Expand Down Expand Up @@ -651,7 +655,9 @@ def set_cell(self, column_family_id, column, value, timestamp=None, state=True):
integer (8 bytes).

:type timestamp: :class:`datetime.datetime`
:param timestamp: (Optional) The timestamp of the operation.
:param timestamp: (Optional) The timestamp of the operation. If a
timestamp is not provided, the current system time
will be used.

:type state: bool
:param state: (Optional) The state that the mutation should be
Expand Down
289 changes: 79 additions & 210 deletions google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from typing import Set
import warnings

from google.api_core import timeout
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import Aborted
from google.api_core.exceptions import DeadlineExceeded
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import InternalServerError
from google.api_core.gapic_v1.method import DEFAULT
Expand All @@ -31,17 +30,20 @@
from google.cloud.bigtable.backup import Backup
from google.cloud.bigtable.column_family import _gc_rule_from_pb
from google.cloud.bigtable.column_family import ColumnFamily
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data.exceptions import (
RetryExceptionGroup,
MutationsExceptionGroup,
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.batcher import MutationsBatcher
from google.cloud.bigtable.batcher import FLUSH_COUNT, MAX_MUTATION_SIZE
from google.cloud.bigtable.encryption_info import EncryptionInfo
from google.cloud.bigtable.policy import Policy
from google.cloud.bigtable.row import AppendRow
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import (
PartialRowsData,
_retriable_internal_server_error,
)
from google.cloud.bigtable.row_data import PartialRowsData
from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS
from google.cloud.bigtable.row_set import RowSet
from google.cloud.bigtable.row_set import RowRange
Expand All @@ -52,6 +54,7 @@
from google.cloud.bigtable.admin.types import (
bigtable_table_admin as table_admin_messages_v2_pb2,
)
from google.rpc import code_pb2, status_pb2

# Maximum number of mutations in bulk (MutateRowsRequest message):
# (https://cloud.google.com/bigtable/docs/reference/data/rpc/
Expand Down Expand Up @@ -714,6 +717,9 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
specify a ``retry`` strategy of "do-nothing", a deadline of ``0.0``
can be specified.

If a deadline of ``None`` is specified, the deadline defaults to
a table-default of 600 seconds (10 minutes).

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

Expand All @@ -731,18 +737,78 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the `rows`.

:raise: ValueError: If a row entry has no mutations, or too many mutations
"""
if timeout is DEFAULT:
timeout = self.mutation_timeout

retryable_mutate_rows = _RetryableMutateRowsWorker(
self._instance._client,
self.name,
rows,
app_profile_id=self._app_profile_id,
timeout=timeout,
retryable_errors = RETRYABLE_MUTATION_ERRORS

# The data client cannot take in zero or null values for deadline, so we set it to
# the default if that is the case.
if retry.deadline is None:
operation_timeout = TABLE_DEFAULT.MUTATE_ROWS

# To adhere to the retry strategy of do-nothing being achievable with a deadline
# of 0.0, we modify the retryable errors to be empty if such a deadline is passed.
elif retry.deadline == 0:
operation_timeout = TABLE_DEFAULT.MUTATE_ROWS
retryable_errors = []
else:
operation_timeout = retry.deadline

attempt_timeout = timeout
mutation_entries = [
RowMutationEntry(row.row_key, row._get_mutations()) for row in rows
]
return_statuses = [status_pb2.Status(code=code_pb2.Code.OK)] * len(
mutation_entries
) # By default, return status OKs for everything

try:
self._table_impl.bulk_mutate_rows(
mutation_entries,
operation_timeout=operation_timeout,
attempt_timeout=attempt_timeout,
retryable_errors=retryable_errors,
)
except MutationsExceptionGroup as mut_exc_group:
# We exception handle as follows:
#
# 1. Each exception in the error group is a FailedMutationEntryError, and its
# cause is either a singular exception or a RetryExceptionGroup consisting of
# multiple exceptions.
#
# 2. In the case of a singular exception, if the error does not have a gRPC status
# code, we return a status code of UNKNOWN.
#
# 3. In the case of a RetryExceptionGroup, we use terminal exception in the exception
# group and process that.
for error in mut_exc_group.exceptions:
cause = error.__cause__
if isinstance(cause, RetryExceptionGroup):
return_statuses[error.index] = self._get_status(
cause.exceptions[-1]
)
else:
return_statuses[error.index] = self._get_status(cause)

return return_statuses

@staticmethod
def _get_status(error):
if isinstance(error, GoogleAPICallError) and error.grpc_status_code is not None:
return status_pb2.Status(
code=error.grpc_status_code.value[0],
message=error.message,
details=error.details,
)

return status_pb2.Status(
code=code_pb2.Code.UNKNOWN,
message=str(error),
)
return retryable_mutate_rows(retry=retry)

def sample_row_keys(self):
"""Read a sample of row keys in the table.
Expand Down Expand Up @@ -1070,133 +1136,6 @@ def restore(self, new_table_id, cluster_id=None, backup_id=None, backup_name=Non
)


class _RetryableMutateRowsWorker(object):
"""A callable worker that can retry to mutate rows with transient errors.

This class is a callable that can retry mutating rows that result in
transient errors. After all rows are successful or none of the rows
are retryable, any subsequent call on this callable will be a no-op.
"""

def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None):
self.client = client
self.table_name = table_name
self.rows = rows
self.app_profile_id = app_profile_id
self.responses_statuses = [None] * len(self.rows)
self.timeout = timeout

def __call__(self, retry=DEFAULT_RETRY):
"""Attempt to mutate all rows and retry rows with transient errors.

Will retry the rows with transient errors until all rows succeed or
``deadline`` specified in the `retry` is reached.

:rtype: list
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the ``rows``.
"""
mutate_rows = self._do_mutate_retryable_rows
if retry:
mutate_rows = retry(self._do_mutate_retryable_rows)

try:
mutate_rows()
except (_BigtableRetryableError, RetryError):
# - _BigtableRetryableError raised when no retry strategy is used
# and a retryable error on a mutation occurred.
# - RetryError raised when retry deadline is reached.
# In both cases, just return current `responses_statuses`.
pass

return self.responses_statuses

@staticmethod
def _is_retryable(status):
return status is None or status.code in RETRYABLE_CODES

def _do_mutate_retryable_rows(self):
"""Mutate all the rows that are eligible for retry.

A row is eligible for retry if it has not been tried or if it resulted
in a transient error in a previous call.

:rtype: list
:return: The responses statuses, which is a list of
:class:`~google.rpc.status_pb2.Status`.
:raises: One of the following:

* :exc:`~.table._BigtableRetryableError` if any
row returned a transient error.
* :exc:`RuntimeError` if the number of responses doesn't
match the number of rows that were retried
"""
retryable_rows = []
index_into_all_rows = []
for index, status in enumerate(self.responses_statuses):
if self._is_retryable(status):
retryable_rows.append(self.rows[index])
index_into_all_rows.append(index)

if not retryable_rows:
# All mutations are either successful or non-retryable now.
return self.responses_statuses

entries = _compile_mutation_entries(self.table_name, retryable_rows)
data_client = self.client.table_data_client

kwargs = {}
if self.timeout is not None:
kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout)

try:
responses = data_client.mutate_rows(
table_name=self.table_name,
entries=entries,
app_profile_id=self.app_profile_id,
retry=None,
**kwargs
)
except RETRYABLE_MUTATION_ERRORS as exc:
# If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is
# returned from the initial call, consider
# it to be retryable. Wrap as a Bigtable Retryable Error.
# For InternalServerError, it is only retriable if the message is related to RST Stream messages
if _retriable_internal_server_error(exc) or not isinstance(
exc, InternalServerError
):
raise _BigtableRetryableError
else:
# re-raise the original exception
raise

num_responses = 0
num_retryable_responses = 0
for response in responses:
for entry in response.entries:
num_responses += 1
index = index_into_all_rows[entry.index]
self.responses_statuses[index] = entry.status
if self._is_retryable(entry.status):
num_retryable_responses += 1
if entry.status.code == 0:
self.rows[index].clear()

if len(retryable_rows) != num_responses:
raise RuntimeError(
"Unexpected number of responses",
num_responses,
"Expected",
len(retryable_rows),
)

if num_retryable_responses:
raise _BigtableRetryableError

return self.responses_statuses


class ClusterState(object):
"""Representation of a Cluster State.

Expand Down Expand Up @@ -1343,73 +1282,3 @@ def _create_row_request(
row_set._update_message_request(message)

return message


def _compile_mutation_entries(table_name, rows):
"""Create list of mutation entries

:type table_name: str
:param table_name: The name of the table to write to.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:rtype: List[:class:`data_messages_v2_pb2.MutateRowsRequest.Entry`]
:returns: entries corresponding to the inputs.
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
greater than the max ({})
""".format(
_MAX_BULK_MUTATIONS
)
entries = []
mutations_count = 0
entry_klass = data_messages_v2_pb2.MutateRowsRequest.Entry

for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
mutations = row._get_mutation_pbs()
entries.append(entry_klass(row_key=row.row_key, mutations=mutations))
mutations_count += len(mutations)

if mutations_count > _MAX_BULK_MUTATIONS:
raise TooManyMutationsError(
"Maximum number of mutations is %s" % (_MAX_BULK_MUTATIONS,)
)
return entries


def _check_row_table_name(table_name, row):
"""Checks that a row belongs to a table.

:type table_name: str
:param table_name: The name of the table.

:type row: :class:`~google.cloud.bigtable.row.Row`
:param row: An instance of :class:`~google.cloud.bigtable.row.Row`
subclasses.

:raises: :exc:`~.table.TableMismatchError` if the row does not belong to
the table.
"""
if row.table is not None and row.table.name != table_name:
raise TableMismatchError(
"Row %s is a part of %s table. Current table: %s"
% (row.row_key, row.table.name, table_name)
)


def _check_row_type(row):
"""Checks that a row is an instance of :class:`.DirectRow`.

:type row: :class:`~google.cloud.bigtable.row.Row`
:param row: An instance of :class:`~google.cloud.bigtable.row.Row`
subclasses.

:raises: :class:`TypeError <exceptions.TypeError>` if the row is not an
instance of DirectRow.
"""
if not isinstance(row, DirectRow):
raise TypeError(
"Bulk processing can not be applied for " "conditional or append mutations."
)
Loading
Loading