[Из песочницы] Angular2 + Websocket + RxJS + Rails5

image

Всем привет! Эта статья о том как связать клиентское приложение Angular2 с Rails 5 сервером используя Websocket.

Rails


Для примера нам потребуется простейший ApplicationCable: Channel:
class ChatChannel < ApplicationCable::Channel
  def subscribed
    stream_from 'chat_channel'
  end

  def index
    ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }
  end

  def create( data )
    Message.create( name: data[ 'name' ], text: data[ 'text' ] );
    ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }
  end

  def unsubscribed
  end
end

Angular2


WebSocketService


Для начала нам потребуется сервис, который будет обеспечивать наше приложение обменом данных с сервером:
import { Injectable }                           from "@angular/core";
import { Subject, Observable, Subscription }    from 'rxjs/Rx';
import { WebSocketSubject }                     from "rxjs/observable/dom/WebSocketSubject";

@Injectable()
export class WebSocketService {

    private ws: WebSocketSubject;
    private socket: Subscription;
    private url: string;

    public message: Subject = new Subject();
    public opened: Subject = new Subject();

    public close():void{
        this.socket.unsubscribe();
        this.ws.complete();
    }

    public sendMessage( message:string ):void{
        this.ws.next( message );
    }

    public start( url: string ):void{
        let self = this;

        this.url = url;

        this.ws = Observable.webSocket( this.url );

        this.socket = this.ws.subscribe( {
            
            next: ( data:MessageEvent ) => {
                if( data[ 'type' ] == 'welcome' ){
                    self.opened.next( true );
                }
                this.message.next( data );
            },
            error: () => {

                self.opened.next( false );
                this.message.next( { type: 'closed' } );

                self.socket.unsubscribe();

                setTimeout( () => {
                    self.start( self.url );
                }, 1000 );

            },
            complete: () => {
                this.message.next( { type: 'closed' } );
            }
            
        } );
        
    }
    
}

В данном сервисе есть 3 приватные и 2 публичные переменные, а также 3 публичные функции.

private ws: WebSocketSubject;
private socket: Subscription;
private url: string;

ws — наблюдаемая переменная WebSocketSubject.
socket — переменная для подписки на ws.
url — ссылка на сокет.

public message: Subject = new Subject();
public opened: Subject = new Subject();

message — наблюдаемая переменная в которую транслируются все данные из сокета.
opened — наблюдаемая переменная которая следит за открытием/закрытием соединения с сокетом.

public close():void{
    this.socket.unsubscribe();
    this.ws.complete();
}

Функция для закрытия сокета.

public sendMessage( message:string ):void{
    this.ws.next( message );
}

Функция для отправки данных в сокет.

public start( url: string ):void{
    let self = this;

    this.url = url;

    this.ws = Observable.webSocket( this.url );

    this.socket = this.ws.subscribe( {

        next: ( data:MessageEvent ) => {
            if( data[ 'type' ] == 'welcome' ){
                self.opened.next( true );
            }
            this.message.next( data );
        },
        error: () => {

            self.opened.next( false );
            this.message.next( { type: 'closed' } );

            self.socket.unsubscribe();

            setTimeout( () => {
                self.start( self.url );
            }, 1000 );

        },
        complete: () => {
            this.message.next( { type: 'closed' } );
        }

    } );

}

Эта функция открывает соединение с сокетом, записывая его объект в наблюдаемую переменную и подписывается на трансляцию из нее. При потере связи каждую секунду пытается восстановить связь.

ChannelWebsocketService


Наследуемый сервис для подписки на каналы Rails5 Action Cable:

import { Injectable }              from "@angular/core";
import { Subject }                 from "rxjs/Subject";

import { WebSocketService }        from "./websocket.service";

@Injectable()
export class ChannelWebsocketService {

    private socketStarted: boolean;

    public observableData: Subject = new Subject();
    public identifier:Object = {};
    public identifierStr: string;
    public subscribed: Subject = new Subject();

    constructor( private websocketService: WebSocketService ){

        this.observeOpened();
        this.observeMessage();

    }

    private static encodeIdentifier( identifier:string ):Object{

        return JSON.parse( identifier );

    }

    private static getDataString( parameters:Object ):string{

        let first = true,
            result = '';

        for ( let key in parameters ){

            if( first ){

                first = false;
                result +=  `\"${ key }\":\"${ parameters[ key ] }\"`;

            } else {

                result += `, \"${ key }\":\"${ parameters[ key ] }\"`;

            }

        }

        return `{ ${ result } }`;

    }

    private getSubscribeString():string{

        this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );

        return JSON.stringify( {
            command: 'subscribe',
            identifier: this.identifierStr
        } );

    };

    private isThisChannel( data:Object ):boolean {

        if( data[ 'identifier' ] ){

            let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );

            if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){

                return true;

            }

        }

        return false;

    }

    private observeMessage(){

        let self = this;

        this.websocketService.message.subscribe( ( data: Object ) => {

            if( self.isThisChannel( data ) ){

                if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){

                    this.subscribed.next( true );

                } else if ( data[ 'message' ] ){

                    this.observableData.next( data[ 'message' ] );

                }

            }

        } );

    }

    private observeOpened(){

        let self = this;

        this.websocketService.opened.subscribe( ( data: boolean ) => {

            self.socketStarted = data;

            if( data ){
                self.subscribe();
            }

        } );

    }
    
    private subscribe(){

        this.websocketService.sendMessage( this.getSubscribeString() );

    }
    

    public send( data: Object ){

        this.websocketService.sendMessage( JSON.stringify( {
            command:'message',
            identifier: this.identifierStr,
            data: ChannelWebsocketService.getDataString( data )
        } ) );

    }

    public unsubscribe(){

        this.websocketService.sendMessage( JSON.stringify( {
            command: 'unsubscribe',
            identifier: this.identifierStr } ) );
        this.subscribed.next( false );

    }

}

В данном сервисе есть 2 приватные и 3 публичные переменные, а также 7 приватных и 2 публичные функции.

private socketStarted: boolean;
private identifierStr: string;

socketStarted — переменная в которую транслируется состояние подписки на сокет.
identifierStr — специально подготовленная строка идентификатор для Rails5 Action Cable канала.

public observableData: Subject = new Subject();
public identifier:Object = {};
public subscribed: Subject = new Subject();

observableData — наблюдаемая переменная в которую записывается сообщение из сокета для канала.
identifier — объект идентификатор для Rails5 Action Cable канала.
subscribed — наблюдаемая переменная в которую записывается состояние подписки.

constructor( private websocketService: WebSocketService ){

    this.observeOpened();
    this.observeMessage();

}

...

private observeMessage(){

    let self = this;

    this.websocketService.message.subscribe( ( data: Object ) => {

        if( self.isThisChannel( data ) ){

            if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){

                this.subscribed.next( true );

            } else if ( data[ 'message' ] ){

                this.observableData.next( data[ 'message' ] );

            }

        }

    } );

}

private observeOpened(){

    let self = this;

    this.websocketService.opened.subscribe( ( data: boolean ) => {

        self.socketStarted = data;

        if( data ){
            self.subscribe();
        }

    } );

}

В конструкторе этого сервиса мы вызываем 2 функции: observeMessage и observeOpened, которые отслеживают присланные сокетом данные и состояние сокета соответственно.

private static encodeIdentifier( identifier:string ):Object{

    return JSON.parse( identifier );

}

private static getDataString( parameters:Object ):string{

    let first = true,
        result = '';

    for ( let key in parameters ){

        if( first ){

            first = false;
            result +=  `\"${ key }\":\"${ parameters[ key ] }\"`;

        } else {

            result += `, \"${ key }\":\"${ parameters[ key ] }\"`;

        }

    }

    return `{ ${ result } }`;

}

encodeIdentifier — статическая приватная функция декодирует строку идентификатора, которую вернул сокет для идентификации сообщения на принадлежность к каналу.
getDataString — преобразовывает объект в формат строки, который принимает Rails5 Action Cable.

private getSubscribeString():string{

    this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );

    return JSON.stringify( {
        command: 'subscribe',
        identifier: this.identifierStr
    } );

};

Возвращает строку для подписки на канал Rails5 Action Cable.

private isThisChannel( data:Object ):boolean {

    if( data[ 'identifier' ] ){

        let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );

        if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){

            return true;

        }

    }

    return false;

}

Определяет принадлежность сообщения от сокета к данному каналу.

 private subscribe(){

    this.websocketService.sendMessage( this.getSubscribeString() );

}

Подписывает на канал.

public send( data: Object ){

    this.websocketService.sendMessage( JSON.stringify( {
        command:'message',
        identifier: this.identifierStr,
        data: ChannelWebsocketService.getDataString( data )
    } ) );

}

Отправляет данные в канал Rails5 Action Cable;

public unsubscribe(){

    this.websocketService.sendMessage( JSON.stringify( {
        command: 'unsubscribe',
        identifier: this.identifierStr } ) );
    this.subscribed.next( false );

}

Отписывает от канала.

ChatChannelService


Cервис, унаследованный от ChannelWebsocketService для подписки на канал ChatChannel:

import { Injectable }                           from "@angular/core";

import { ChannelWebsocketService }    from "./channel.websocket.service";
import { WebSocketService }               from "./websocket.service";

@Injectable()
export class ChatChannelService extends ChannelWebsocketService {

    constructor( websocketService: WebSocketService ){

        super( websocketService );

        this.identifier = {
            channel: 'ChatChannel'
        };

    }

}

В конструкторе этого сервиса переопределяется переменная identifier для идентификации канала.

ChatComponent


Компонент который используя ChatChannelService принимает/отправляет данные в канал.
Пример кода не привожу, он есть в GitHub, ссылка на который приведена ниже

Пример


Здесь можно скачать пример.

Для старта клиентского приложения переходим в папку «client» и вызываем:

npm install
gulp

Надеюсь эта статья поможет разобраться в данном вопросе.

Комментарии (0)

© Habrahabr.ru