Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -127,6 +128,14 @@ public long getWriteBufferSize() {
return this.settings.getBatchingMaxRequestSize();
}

public int getMaxRowKeyCount() {
return this.settings.getBulkMaxRowCount();
}

public Duration getAutoFlushInterval() {
return this.settings.getAutoFlushInterval();
}

public List<ApiFuture<?>> mutate(List<? extends Mutation> mutations) {
closedReadLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -101,6 +102,8 @@ public int getTtlSecondsForBackup() {

public abstract long getBatchingMaxRequestSize();

public abstract Duration getAutoFlushInterval();

// This is equivalent to allow server-side timestamp.
public abstract boolean isRetriesWithoutTimestampAllowed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public long getBatchingMaxRequestSize() {
return batchingMaxMemory;
}

@Override
public java.time.Duration getAutoFlushInterval() {
return dataSettings
.getStubSettings()
.bulkMutateRowsSettings()
.getBatchingSettings()
.getDelayThresholdDuration();
}

@Override
public boolean isRetriesWithoutTimestampAllowed() {
return allowRetriesWithoutTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.beam.validation.SyncTableJob;
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import java.io.BufferedReader;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -122,9 +124,13 @@ public void setup() throws Exception {

uploadFixture(gcsUtil, SNAPSHOT_FIXTURE_NAME, fixtureDir);

// Disable CSM to reduce noise in the test output
Configuration config =
BigtableConfiguration.configure(properties.getProjectId(), properties.getInstanceId());
config.set(BigtableOptionsFactory.BIGTABLE_ENABLE_CLIENT_SIDE_METRICS, "false");

// Bigtable config
connection =
BigtableConfiguration.connect(properties.getProjectId(), properties.getInstanceId());
connection = BigtableConfiguration.connect(config);
// TODO: use timebased names to allow for gc
tableId = "test_" + UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.beam.test_env.EnvSetup;
import com.google.cloud.bigtable.beam.test_env.TestProperties;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -112,6 +113,8 @@ public void setUp() throws IOException {
properties
.getAdminEndpoint()
.ifPresent(endpoint -> config.set(BIGTABLE_ADMIN_HOST_KEY, endpoint));
// Disable CSM to reduce noise in the test output
config.set(BigtableOptionsFactory.BIGTABLE_ENABLE_CLIENT_SIDE_METRICS, "false");

connection = BigtableConfiguration.connect(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.bigtable.beam.test_env.EnvSetup;
import com.google.cloud.bigtable.beam.test_env.TestProperties;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.truth.Correspondence;
Expand All @@ -38,6 +39,7 @@
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -89,9 +91,13 @@ public void setup() throws Exception {
properties.applyTo(gcpOptions);
gcsUtil = new GcsUtil.GcsUtilFactory().create(gcpOptions);

// Disable CSM to reduce noise in the test output
Configuration config =
BigtableConfiguration.configure(properties.getProjectId(), properties.getInstanceId());
config.set(BigtableOptionsFactory.BIGTABLE_ENABLE_CLIENT_SIDE_METRICS, "false");

// Bigtable config
connection =
BigtableConfiguration.connect(properties.getProjectId(), properties.getInstanceId());
connection = BigtableConfiguration.connect(config);
// TODO: support endpoints
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ limitations under the License.
<targetDependencies>
<targetDependency>org.apache.hbase:hbase-mapreduce</targetDependency>
</targetDependencies>
<ignoredDependencies>
<!-- TODO: version mismatches after upgrading hbase client to 2.6.4 -->
<ignoredDependency>javax.activation:activation</ignoredDependency>
<ignoredDependency>javax.xml.bind:jaxb-api</ignoredDependency>
<ignoredDependency>javax.xml.stream:stax-api</ignoredDependency>
</ignoredDependencies>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ limitations under the License.
<exclude>org.apache.htrace:htrace-core4</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.yetus:audience-annotations</exclude>
<exclude>jakarta.activation:jakarta.activation-api</exclude>
</excludes>
</artifactSet>
<relocations>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,13 @@ public int getStoreFileCount() {
public Size getStoreFileSize() {
return new Size(size, Unit.BYTE);
}

@Override
public Size getMemStoreSize() {
return new Size(size, Unit.BYTE);
}
}

/** Handler for unsupported operations for generating Admin class at runtime. */
public static class UnsupportedOperationsHandler implements InvocationHandler {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -88,6 +90,24 @@ public long getWriteBufferSize() {
return helper.getWriteBufferSize();
}

/** {@inheritDoc} */
@Override
public int getMaxMutations() {
return helper.getMaxRowKeyCount();
}

/** {@inheritDoc} */
@Override
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(helper.getAutoFlushInterval().toNanos(), TimeUnit.NANOSECONDS);
}

/** {@inheritDoc} */
@Override
public Map<String, byte[]> getRequestAttributes() {
throw new UnsupportedOperationException("not implemented");
}

/** {@inheritDoc} */
@Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -58,7 +59,13 @@ public BigtableConnection(Configuration conf) throws IOException {

public BigtableConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
super(conf);
super(conf, false, pool, user);
}

public BigtableConnection(
Configuration conf, ExecutorService pool, User user, Map<String, byte[]> connectionAttributes)
throws IOException {
super(conf, false, pool, user);
}

/**
Expand Down Expand Up @@ -111,6 +118,11 @@ public TableBuilder setOperationTimeout(int arg0) {
return this;
}

@Override
public TableBuilder setRequestAttribute(String s, byte[] bytes) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Table build() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -76,6 +77,11 @@ public BigtableAsyncConnection(Configuration conf) throws IOException {
this(conf, null, null, null);
}

public BigtableAsyncConnection(
Configuration conf, final User ignoredUser, Map<String, byte[]> ignored) throws IOException {
this(conf, null, null, null);
}

// This constructor is used in HBase 2 version < 2.3
public BigtableAsyncConnection(
Configuration conf, Object ignoredAsyncRegistry, String ignoredClusterId, User ignoredUser)
Expand Down Expand Up @@ -201,6 +207,11 @@ public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int i) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setMaxMutations(int i) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int arg0) {
return this;
Expand Down Expand Up @@ -259,6 +270,12 @@ public AsyncTableBuilder<AdvancedScanResultConsumer> setStartLogErrorsCnt(int ar
return this;
}

@Override
public AsyncTableBuilder<AdvancedScanResultConsumer> setRequestAttribute(
String s, byte[] bytes) {
return this;
}

@Override
public AsyncTableBuilder<AdvancedScanResultConsumer> setScanTimeout(
long arg0, TimeUnit arg1) {
Expand Down Expand Up @@ -304,6 +321,11 @@ public AsyncTableBuilder<AdvancedScanResultConsumer> setRetryPauseForServerOverl
return this;
}

@Override
public AsyncTableBuilder<AdvancedScanResultConsumer> setMaxRetries(int maxRetries) {
return this;
}

@Override
public AsyncTable build() {
return new BigtableAsyncTable(BigtableAsyncConnection.this, createAdapter(tableName));
Expand Down Expand Up @@ -371,6 +393,11 @@ public AsyncTableBuilder<ScanResultConsumer> setStartLogErrorsCnt(int arg0) {
return this;
}

@Override
public AsyncTableBuilder<ScanResultConsumer> setRequestAttribute(String s, byte[] bytes) {
return this;
}

@Override
public AsyncTableBuilder<ScanResultConsumer> setWriteRpcTimeout(long arg0, TimeUnit arg1) {
return this;
Expand All @@ -381,6 +408,11 @@ public AsyncTableBuilder<ScanResultConsumer> setRetryPauseForServerOverloaded(
long l, TimeUnit timeUnit) {
return this;
}

@Override
public AsyncTableBuilder<ScanResultConsumer> setMaxRetries(int maxRetries) {
return this;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;

/** Implementation for `hbase.client.registry.impl` for HBase >= 2.3 */
@InternalApi
Expand All @@ -32,6 +33,10 @@ public BigtableConnectionRegistry(Configuration ignored) {
// noop
}

public BigtableConnectionRegistry(Configuration ignored, User user) {
// noop
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
throw new UnsupportedOperationException();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ limitations under the License.

<!-- hbase dependency versions -->
<hbase1.version>1.7.2</hbase1.version>
<hbase2.version>2.5.7-hadoop3</hbase2.version>
<hbase2.version>2.6.4-hadoop3</hbase2.version>
<hbase2-testing-util-mockito.version>2.28.2</hbase2-testing-util-mockito.version>

<!-- bytebuddy dependency version for resolving hbase backward incompatible changes -->
Expand Down