Просмотр исходного кода

Merge branch 'worktree-agent-a303e65e'

User 2 месяцев назад
Родитель
Сommit
ca21cc9464

+ 133 - 0
src/hooks/use-realtime-channel.ts

@@ -0,0 +1,133 @@
+"use client";
+
+import { useEffect, useRef, useState } from "react";
+import type { RealtimeChannel, RealtimePostgresChangesPayload } from "@supabase/supabase-js";
+import { getSupabaseBrowserClient } from "@/lib/supabase/client";
+import {
+  type ConnectionStatus,
+  ReconnectionManager,
+  cleanupChannel,
+} from "@/lib/realtime/subscription-manager";
+
+interface PostgresChangesConfig<T extends Record<string, unknown>> {
+  event: "INSERT" | "UPDATE" | "DELETE" | "*";
+  schema: string;
+  table: string;
+  filter?: string;
+  onPayload: (payload: RealtimePostgresChangesPayload<T>) => void;
+}
+
+interface UseRealtimeChannelOptions<T extends Record<string, unknown>> {
+  /** Unique channel name. Changing this unsubscribes from the old channel and subscribes to the new one. */
+  channelName: string | null;
+  /** Postgres changes subscription config. */
+  config: PostgresChangesConfig<T>;
+  /** Whether the subscription is enabled. Defaults to true. */
+  enabled?: boolean;
+}
+
+interface UseRealtimeChannelReturn {
+  status: ConnectionStatus;
+}
+
+/**
+ * Generic hook for Supabase Realtime channel management.
+ * Handles subscribe/unsubscribe lifecycle, connection state tracking,
+ * and reconnection with exponential backoff.
+ */
+export function useRealtimeChannel<T extends Record<string, unknown>>({
+  channelName,
+  config,
+  enabled = true,
+}: UseRealtimeChannelOptions<T>): UseRealtimeChannelReturn {
+  const [status, setStatus] = useState<ConnectionStatus>("disconnected");
+  const channelRef = useRef<RealtimeChannel | null>(null);
+  const reconnectRef = useRef(new ReconnectionManager());
+  const onPayloadRef = useRef(config.onPayload);
+  const configRef = useRef(config);
+
+  // Sync refs inside an effect to satisfy react-hooks/refs (React 19 strict mode)
+  useEffect(() => {
+    onPayloadRef.current = config.onPayload;
+    configRef.current = config;
+  });
+
+  // Reset status when subscription is disabled
+  const isActive = !!channelName && enabled;
+  if (!isActive && status !== "disconnected") {
+    setStatus("disconnected");
+  }
+
+  useEffect(() => {
+    if (!channelName || !enabled) {
+      if (channelRef.current) {
+        const supabase = getSupabaseBrowserClient();
+        cleanupChannel(channelRef.current, (ch) => supabase.removeChannel(ch));
+        channelRef.current = null;
+      }
+      return;
+    }
+
+    const reconnect = reconnectRef.current;
+
+    // Capture non-null value for use in the subscribe closure
+    const activeChannelName = channelName;
+
+    function subscribe(): void {
+      const supabase = getSupabaseBrowserClient();
+      const cfg = configRef.current;
+
+      if (channelRef.current) {
+        cleanupChannel(channelRef.current, (ch) => supabase.removeChannel(ch));
+        channelRef.current = null;
+      }
+
+      setStatus("connecting");
+
+      const channel = supabase
+        .channel(activeChannelName)
+        .on<T>(
+          "postgres_changes",
+          {
+            event: cfg.event,
+            schema: cfg.schema,
+            table: cfg.table,
+            ...(cfg.filter ? { filter: cfg.filter } : {}),
+          },
+          (payload) => {
+            onPayloadRef.current(payload);
+          },
+        )
+        .subscribe((subscribedStatus) => {
+          if (subscribedStatus === "SUBSCRIBED") {
+            setStatus("connected");
+            reconnect.reset();
+          } else if (
+            subscribedStatus === "CHANNEL_ERROR" ||
+            subscribedStatus === "TIMED_OUT"
+          ) {
+            setStatus("error");
+            reconnect.schedule(() => subscribe());
+          } else if (subscribedStatus === "CLOSED") {
+            setStatus("disconnected");
+          }
+        });
+
+      channelRef.current = channel;
+    }
+
+    subscribe();
+
+    return () => {
+      reconnect.clear();
+      if (channelRef.current) {
+        const supabase = getSupabaseBrowserClient();
+        cleanupChannel(channelRef.current, (ch) => supabase.removeChannel(ch));
+        channelRef.current = null;
+      }
+      setStatus("disconnected");
+    };
+  }, [channelName, enabled]);
+
+  return { status };
+}

+ 70 - 0
src/hooks/use-realtime-movies.ts

@@ -0,0 +1,70 @@
+"use client";
+
+import { useCallback } from "react";
+import type { RealtimePostgresChangesPayload } from "@supabase/supabase-js";
+import { useQueryClient } from "@tanstack/react-query";
+import type { Database } from "@/types/database";
+import { useRealtimeChannel } from "./use-realtime-channel";
+import { buildEqFilter } from "@/lib/realtime/subscription-manager";
+
+type MovieRow = Database["public"]["Tables"]["movies"]["Row"];
+
+function moviesQueryKey(groupId: string): readonly [string, string] {
+  return ["movies", groupId] as const;
+}
+
+/**
+ * Hook that subscribes to Supabase Realtime changes on the movies table
+ * for a specific group_id. Updates the TanStack Query cache directly
+ * on INSERT, UPDATE, and DELETE events.
+ */
+export function useRealtimeMovies(groupId: string | null) {
+  const queryClient = useQueryClient();
+
+  const handlePayload = useCallback(
+    (payload: RealtimePostgresChangesPayload<MovieRow>) => {
+      if (!groupId) return;
+
+      const key = moviesQueryKey(groupId);
+
+      if (payload.eventType === "INSERT") {
+        const newMovie = payload.new;
+        queryClient.setQueryData<MovieRow[]>(key, (old) => {
+          if (!old) return [newMovie];
+          // Avoid duplicates (e.g., if the local optimistic update already added it)
+          if (old.some((m) => m.id === newMovie.id)) return old;
+          return [...old, newMovie];
+        });
+      } else if (payload.eventType === "UPDATE") {
+        const updated = payload.new;
+        queryClient.setQueryData<MovieRow[]>(key, (old) => {
+          if (!old) return old;
+          return old.map((m) => (m.id === updated.id ? updated : m));
+        });
+      } else if (payload.eventType === "DELETE") {
+        const deleted = payload.old;
+        if (deleted && "id" in deleted) {
+          queryClient.setQueryData<MovieRow[]>(key, (old) => {
+            if (!old) return old;
+            return old.filter((m) => m.id !== deleted.id);
+          });
+        }
+      }
+    },
+    [groupId, queryClient],
+  );
+
+  const { status } = useRealtimeChannel<MovieRow>({
+    channelName: groupId ? `movies:${groupId}` : null,
+    config: {
+      event: "*",
+      schema: "public",
+      table: "movies",
+      filter: groupId ? buildEqFilter("group_id", groupId) : undefined,
+      onPayload: handlePayload,
+    },
+    enabled: !!groupId,
+  });
+
+  return { status };
+}

+ 61 - 0
src/lib/realtime/subscription-manager.ts

@@ -0,0 +1,61 @@
+import type { RealtimeChannel } from "@supabase/supabase-js";
+
+export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "error";
+
+/**
+ * Build a Postgres changes filter string for a column equality check.
+ * Example: buildEqFilter("group_id", "abc-123") => "group_id=eq.abc-123"
+ */
+export function buildEqFilter(column: string, value: string): string {
+  return `${column}=eq.${value}`;
+}
+
+const BASE_DELAY_MS = 1000;
+const MAX_DELAY_MS = 30000;
+
+/**
+ * Calculate exponential backoff delay with jitter.
+ */
+export function getBackoffDelay(attempt: number): number {
+  const exponential = Math.min(BASE_DELAY_MS * 2 ** attempt, MAX_DELAY_MS);
+  const jitter = Math.random() * exponential * 0.5;
+  return exponential + jitter;
+}
+
+/**
+ * Manages reconnection attempts with exponential backoff.
+ * Call `schedule` to queue a reconnection; call `reset` on successful connect; call `clear` on cleanup.
+ */
+export class ReconnectionManager {
+  private attempt = 0;
+  private timerId: ReturnType<typeof setTimeout> | null = null;
+
+  schedule(reconnectFn: () => void): void {
+    this.clear();
+    const delay = getBackoffDelay(this.attempt);
+    this.timerId = setTimeout(() => {
+      this.attempt++;
+      reconnectFn();
+    }, delay);
+  }
+
+  reset(): void {
+    this.attempt = 0;
+    this.clear();
+  }
+
+  clear(): void {
+    if (this.timerId !== null) {
+      clearTimeout(this.timerId);
+      this.timerId = null;
+    }
+  }
+}
+
+export function cleanupChannel(
+  channel: RealtimeChannel,
+  removeChannel: (channel: RealtimeChannel) => void,
+): void {
+  channel.unsubscribe();
+  removeChannel(channel);
+}