안녕하세요, 이중혁입니다

배우고 경험한 기술들을 하나씩 정리하는 공간

실시간 채팅 시스템

WebSocket 기반의 실시간 채팅 시스템으로, JWT 인증, 룸 기반 채팅,
비밀번호 보호, 욕설 필터링 등의 기능을 제공합니다.

실시간 채팅

WebSocket 기반

JWT 인증

안전한 사용자 인증

룸 시스템

비밀번호 보호

Kafka 이벤트

서버 간 동기화

채팅 클라이언트 1

로그인이 필요합니다

Kafka 이벤트 시스템 (Kafka Event System)

Kafka를 활용한 실시간 채팅 시스템의 서버 간 동기화 및 이벤트 스트리밍 시스템입니다.

KafkaEvent Streaming

구현 내용

Kafka 이벤트 시스템

대규모 트래픽에서도 끊김 없이 흘러가는 대화. Kafka 중심의 이벤트 스트리밍으로 여러 서버 인스턴스가 하나의 유기체처럼 움직입니다. Producer가 채팅 이벤트를 토픽으로 안정적으로 전달하고, Consumer가 이를 실시간으로 수신해 각 파드에서 Socket.IO로 재브로드캐스트합니다. 멀티 파드 환경에서도 한 채팅방의 맥락과 순서를 지키는 것이 목표입니다.

핵심 기능

  • 이벤트 발행: 서버 표준 DTO로 직렬화하여 토픽에 전송
  • 이벤트 구독: 멀티 파드에서 동시 수신 및 룸 대상 재브로드캐스트
  • 서버 동기화: 파드 간 사용자/룸 상태를 일관되게 유지
  • 파티션 키: roomId 기반으로 메시지 순서 보장
  • 확장성: 고정 10 파티션 + 자동 리밸런싱으로 안정적인 스케일

이벤트 타입

  • user-connect: 인증 성공 시 발행
  • user-disconnect: 연결 종료 시 발행
  • room-join: handleJoinRoom 성공 시 발행
  • room-leave: handleLeaveRoom 성공 시 발행
  • message-send: handleMessage 브로드캐스트 시 발행

키 전략과 파티션 설계

  • Producer Key 설정: roomId를 Key로 설정하여 같은 채팅방 메시지 순서 보장
  • 고정 파티션: 10개 파티션으로 운영 복잡도 낮춤
  • Consumer Group: 파드별 고유 groupId 사용
  • 자동 리밸런싱: Kafka 자동 리밸런싱으로 안정적 확장

확장성 및 성능

  • 수평적 확장: 서버 인스턴스 추가로 무중단 확장
  • 고가용성: Replication, Fault Tolerance, Retry Mechanism
  • 성능 최적화: Producer/Consumer 설정 최적화
  • 부하 분산: Consumer Group으로 파티션 단위 부하 분산

기술 스택

  • Message Broker: Apache Kafka
  • Client Library: kafkajs
  • Pattern: Producer / Consumer
  • Topic: chat-events (10 파티션)
  • Consumer Group: chat-service-group-{serverId}

시스템 아키텍처

KafkaService - Connection Management
  • • Producer/Consumer 연결
  • • 생명주기 관리
  • • 재연결 처리
KafkaProducerService - Event Publishing
  • • 채팅 이벤트 발행
  • • 메시지 직렬화
  • • 에러 처리
KafkaConsumerService - Event Consumption
  • • 이벤트 구독
  • • 메시지 역직렬화
  • • 이벤트 라우팅
데이터 플로우
1서버 A에서 채팅 이벤트 발생
2ChattingKafkaProducerService가 이벤트를 Kafka 토픽에 발행
3서버 B의 ChattingKafkaConsumerService가 이벤트 수신
4ChattingService의 consumer 메서드들이 이벤트 처리 및 WebSocket 브로드캐스트
Producer Key 설정 비교
Key 미설정 (라운드 로빈):
장점: 파티션에 균등 분산, 전체 처리량 최대화
단점: 같은 룸 메시지가 서로 다른 파티션으로 분산되어 순서 보장 불가
Key 설정 (roomId):
장점: 같은 룸 메시지가 항상 같은 파티션으로 가서 순서 보장
단점: 특정 인기 룸에 트래픽 집중 시 해당 파티션 핫스팟 가능
파티션 전략
동적 파티션 증설:
장점: 파드 수 증가에 맞춰 처리량 수평 확장
단점: 운영 복잡도↑, 리밸런싱 지연 동안 메시지 처리 공백/지연
고정 파티션 (10개):
장점: 운영 단순, 예측 가능한 동작
단점: 파드 수 < 파티션 수면 일부 파티션 미할당 가능
@Injectable()
export class ChattingKafkaService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(ChattingKafkaService.name);
  private kafka: Kafka;
  private producer: Producer;
  private consumer: Consumer;

  async onModuleInit(): Promise<void> {
    if (!EnvProvider.isProd()) return;
    try {
      // 디버깅: 환경변수 값 확인
      this.logger.log(`KAFKA_BROKER 환경변수: ${process.env.KAFKA_BROKER}`);
      this.logger.log(`SERVER_ID 환경변수: ${process.env.SERVER_ID}`);

      this.kafka = new Kafka({
        clientId: `${process.env.NODE_ENV}-portfolio-chat-service`,
        brokers: [
          process.env.KAFKA_BROKER || 'kafka.kafka.svc.cluster.local:9092',
        ],
        retry: {
          initialRetryTime: 100,
          retries: 8,
        },
      });

      const splitServerId = process.env.SERVER_ID.split('-');
      const shortServerId = splitServerId[splitServerId.length - 1];

      this.producer = this.kafka.producer();
      this.consumer = this.kafka.consumer({
        groupId: `chat-service-group-${shortServerId}`,
      });

      await this.producer.connect();
      await this.consumer.connect();

      this.logger.log('Kafka 연결 성공');
    } catch (error) {
      this.logger.error('Kafka 연결 실패:', error);
      throw error;
    }
  }

  async onModuleDestroy(): Promise<void> {
    if (!EnvProvider.isProd()) return;
    try {
      await this.producer?.disconnect();
      await this.consumer?.disconnect();
      this.logger.log('Kafka 연결 해제 완료');
    } catch (error) {
      this.logger.error('Kafka 연결 해제 실패:', error);
    }
  }

  getProducer(): Producer {
    if (!EnvProvider.isProd()) return;
    if (!this.producer) {
      throw new Error('Kafka producer가 초기화되지 않았습니다.');
    }
    return this.producer;
  }

  getConsumer(): Consumer {
    if (!EnvProvider.isProd()) return;
    if (!this.consumer) {
      throw new Error('Kafka consumer가 초기화되지 않았습니다.');
    }
    return this.consumer;
  }
}

실시간 채팅 시스템 (Real-time Chat System)

WebSocket 기반의 실시간 채팅 시스템으로, JWT 인증, 룸 기반 채팅, 비밀번호 보호, 텍스트 필터링(naughty-words) 등의 기능을 제공합니다.

WebSocketJWTRedisSocket.IO

구현 내용

실시간 채팅 시스템

실시간 채팅 시스템은 WebSocket 기반의 실시간 통신을 제공하는 시스템으로, JWT 인증을 통한 안전한 사용자 인증과 룸 기반 채팅을 지원합니다.RedisKafka를 활용한 다중 서버 동기화 및 데이터 관리 구조를 제공합니다.

핵심 기능

  • JWT 인증: WebSocket 연결 시 토큰 검증
  • 룸 기반 채팅: 비밀번호 보호된 독립 채팅방
  • 실시간 통신: Socket.IO 기반 즉시 메시지 전송
  • 다중 서버 동기화: Kafka 기반 서버 간 이벤트 동기화
  • 텍스트 필터링: naughty-words 다국어 자동 필터링

연결 관리

  • handleConnection: JWT 토큰 검증 및 사용자 인증
  • handleDisconnect: 연결 해제 시 정리 작업
  • afterInit: WebSocket 서버 초기화
  • Redis 저장: 사용자 세션 데이터 관리

룸 관리

  • handleJoinRoom: 룸 입장 및 비밀번호 검증
  • handleLeaveRoom: 룸 퇴장 및 정리
  • handleGetRoomUsers: 룸 내 사용자 목록 조회
  • 자동 정리: 빈 룸 자동 삭제

이벤트 시스템

  • authenticated: 인증 성공 이벤트
  • room-joined/left: 룸 입장/퇴장 알림
  • user-joined/left: 사용자 입장/퇴장 알림
  • message: 실시간 메시지 전송
  • room-users: 룸 사용자 목록 응답
  • error: 에러 처리 및 클라이언트 알림

기술 스택

  • Backend: NestJS, Socket.IO
  • Database: Redis
  • Message Queue: Kafka
  • Authentication: JWT, OAuth

시스템 아키텍처

ChattingGateway - WebSocket Layer
  • • 연결 관리
  • • 이벤트 라우팅
  • • JWT 인증
ChattingService - Business Logic
  • • 메시지 처리
  • • 룸 관리
  • • 텍스트 필터링 (naughty-words)
메시지 처리
  • handleMessage: 메시지 수신 및 브로드캐스트
  • 텍스트 필터링: naughty-words 다국어 자동 필터링
  • 타임스탬프: 메시지 시간 기록
  • Kafka Producer: 메시지 이벤트 발행
Kafka 동기화
  • Producer: producerJoinRoom, producerLeaveRoom
  • Producer: producerChatMessage, producerDisconnectUser
  • Consumer: consumerJoinRoom, consumerLeaveRoom
  • Consumer: consumerChatMessage, consumerDisconnectUser
데이터 저장소
  • Redis: 사용자 세션, 룸 정보, 실시간 데이터
  • Kafka: 서버 간 이벤트 동기화, 메시지 큐

@WebSocketGateway({ 
  namespace: 'socket/chat', 
  cors: { 
    origin: [process.env.CORS_DOMAIN],
    credentials: true
  } 
})
export class ChattingGateway
  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
  private readonly logger = new Logger(ChattingGateway.name);

  constructor(private readonly chattingService: ChattingService) {}

  @WebSocketServer() server: Server;

  afterInit(): void {
    this.chattingService.setServer(this.server);
  }

  async handleConnection(client: Socket): Promise<void> {
    try {
      const payload = await this.chattingService.validateAndSetupClient(client);
      if (!payload) {
        client.emit('error', { message: '유효하지 않은 인증 토큰입니다.' });
        client.disconnect();
        return;
      }

      const socketId = await this.chattingService.getConnectedSocketId(
        payload.id,
      );

      await this.chattingService.connectUser(payload, client);

      await this.chattingService.producerDisconnectUser(socketId);
    } catch (error) {
      this.logger.error(`연결 처리 실패: ${error.message}`);
    }
  }

  async handleDisconnect(client: Socket): Promise<void> {
    try {
      await this.chattingService.producerLeaveRoom(client);
      await this.chattingService.producerDisconnectUser(client.id);

      await this.chattingService.disconnectUser(client);
    } catch (error) {
      this.logger.error(
        `연결 해제 처리 실패 - 클라이언트 ${client.id}:`,
        error,
      );
    }
  }

  @SubscribeMessage('join-room')
  async handleJoinRoom(
    @MessageBody() data: JoinRoomDto,
    @ConnectedSocket() client: Socket,
  ): Promise<void> {
    const { roomId, password } = data;

    // 룸 비밀번호 확인/생성 위임
    const ensure = await this.chattingService.ensureRoomWithPassword(
      client,
      roomId,
      password,
    );

    if (!ensure.ok) return;

    // 기존 룸에서 모두 나가기
    await this.chattingService.producerLeaveRoom(client);

    // 최종 입장 처리
    await this.chattingService.producerJoinRoom(client, roomId, ensure.isExist);
  }

  @SubscribeMessage('leave-room')
  async handleLeaveRoom(@ConnectedSocket() client: Socket): Promise<void> {
    await this.chattingService.producerLeaveRoom(client);
  }

  @SubscribeMessage('message')
  async handleMessage(
    @MessageBody() data: ChatMessageDto,
    @ConnectedSocket() client: Socket,
  ): Promise<void> {
    const { message, username } = data;

    await this.chattingService.producerChatMessage(client, message, username);
  }

  @SubscribeMessage('get-room-users')
  async handleGetRoomUsers(@ConnectedSocket() client: Socket): Promise<void> {
    const roomId = client.data.currentRoom;

    if (!client.rooms.has(roomId)) {
      this.logger.warn(`Client ${client.id} not in room ${roomId}`);
      client.emit('error', {
        message: '해당 룸에 입장하지 않았습니다.',
      });
      return;
    }

    // 서비스에서 룸 사용자 목록 가져오기
    await this.chattingService.getRoomUsers(client, roomId);
  }
}