Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
648 changes: 620 additions & 28 deletions e2e/adapter/adapter_with_maestro.go

Large diffs are not rendered by default.

55 changes: 51 additions & 4 deletions pkg/client/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,38 @@ func NewClient() (*Client, error) {

// DeleteNamespaceAndWait deletes a namespace and waits for it to be fully removed
func (c *Client) DeleteNamespaceAndWait(ctx context.Context, namespace string) error {
// Delete namespace
err := c.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
// Check if namespace exists first
_, err := c.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil // Already deleted
}

// Delete all resources in the namespace to speed up cleanup
// This helps avoid timeout issues when namespaces have many resources
gracePeriod := int64(0)
propagationPolicy := metav1.DeletePropagationForeground
deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
PropagationPolicy: &propagationPolicy,
}

// Delete deployments
_ = c.AppsV1().Deployments(namespace).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{})

// Delete jobs
_ = c.BatchV1().Jobs(namespace).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{})

// Delete configmaps
_ = c.CoreV1().ConfigMaps(namespace).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{})

// Delete pods
_ = c.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{})

// Delete namespace with foreground propagation to ensure resources are cleaned up
nsDeleteOpts := metav1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
}
err = c.CoreV1().Namespaces().Delete(ctx, namespace, nsDeleteOpts)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete namespace %s: %w", namespace, err)
}
Expand All @@ -56,8 +86,8 @@ func (c *Client) DeleteNamespaceAndWait(ctx context.Context, namespace string) e
Duration: 500 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
Steps: 20,
Cap: 10 * time.Second, // Cap individual retries at 10s for ~2.4 min total timeout
Steps: 30, // Increased from 20 to give more time
Cap: 15 * time.Second, // Increased cap for better handling of stuck resources
}
err = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
_, err := c.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
Expand Down Expand Up @@ -88,6 +118,23 @@ func (c *Client) FetchNamespace(ctx context.Context, name string) (*corev1.Names
return ns, nil
}

// FindNamespacesByPrefix finds all namespaces whose name starts with the given prefix
func (c *Client) FindNamespacesByPrefix(ctx context.Context, prefix string) ([]string, error) {
namespaces, err := c.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
}

var matchingNamespaces []string
for _, ns := range namespaces.Items {
if len(ns.Name) >= len(prefix) && ns.Name[:len(prefix)] == prefix {
matchingNamespaces = append(matchingNamespaces, ns.Name)
}
}

return matchingNamespaces, nil
}

// FetchConfigMap gets a configmap by name in the specified namespace
func (c *Client) FetchConfigMap(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error) {
cm, err := c.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
Expand Down
90 changes: 90 additions & 0 deletions pkg/client/maestro/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,96 @@ func (c *Client) FindResourceBundleByClusterID(ctx context.Context, clusterID st
return nil, fmt.Errorf("no resource bundle found for cluster ID: %s", clusterID)
}

// FindAllResourceBundlesByClusterID finds all resource bundles for a cluster ID
// Returns all matching resource bundles (multiple adapters may create ManifestWorks for the same cluster)
func (c *Client) FindAllResourceBundlesByClusterID(ctx context.Context, clusterID string) ([]ResourceBundle, error) {
// Use labelSelector query parameter to filter server-side
labelSelector := fmt.Sprintf("%s=%s", client.KeyClusterID, clusterID)
apiURL := fmt.Sprintf("%s%s?labelSelector=%s",
c.baseURL,
resourceBundlesBasePath,
url.QueryEscape(labelSelector))

req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
}

var result ResourceBundleList
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

// Filter and return all matching resource bundles
var bundles []ResourceBundle
for i := range result.Items {
if result.Items[i].Metadata.Labels != nil &&
result.Items[i].Metadata.Labels[client.KeyClusterID] == clusterID {
bundles = append(bundles, result.Items[i])
}
}

return bundles, nil
}

// FindResourceBundlesByAdapterName finds all resource bundles created by a specific adapter
// Uses the maestro.io/source-id label to filter by adapter name
func (c *Client) FindResourceBundlesByAdapterName(ctx context.Context, adapterName string) ([]ResourceBundle, error) {
// Use labelSelector query parameter to filter server-side by adapter source-id
labelSelector := fmt.Sprintf("maestro.io/source-id=%s", adapterName)
apiURL := fmt.Sprintf("%s%s?labelSelector=%s",
c.baseURL,
resourceBundlesBasePath,
url.QueryEscape(labelSelector))

req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
}

var result ResourceBundleList
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

// Filter and return all matching resource bundles
var bundles []ResourceBundle
for i := range result.Items {
if result.Items[i].Metadata.Labels != nil &&
result.Items[i].Metadata.Labels["maestro.io/source-id"] == adapterName {
bundles = append(bundles, result.Items[i])
}
}

return bundles, nil
}

// ListConsumers retrieves the list of registered Maestro consumers
// Returns a list of consumer names
func (c *Client) ListConsumers(ctx context.Context) ([]string, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/helper/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type AdapterDeploymentOptions struct {
ChartPath string
AdapterName string
Timeout time.Duration
SetValues map[string]string // Additional Helm --set values
}

// GenerateAdapterReleaseName generates a unique Helm release name for an adapter deployment
Expand Down Expand Up @@ -152,6 +153,11 @@ func (h *Helper) DeployAdapter(ctx context.Context, opts AdapterDeploymentOption
"--set", fmt.Sprintf("fullnameOverride=%s", releaseName),
)

// Add additional --set values if provided
for key, value := range opts.SetValues {
helmArgs = append(helmArgs, "--set", fmt.Sprintf("%s=%s", key, value))
}

logger.Info("executing Helm command", "args", helmArgs)

// Create context with timeout
Expand Down
Loading