ws — наблюдаемая переменная WebSocketSubject.
socket — переменная для подписки на ws.
url — ссылка на сокет.
public message: Subject = new Subject();
public opened: Subject = new Subject();
message — наблюдаемая переменная в которую транслируются все данные из сокета.
opened — наблюдаемая переменная которая следит за открытием/закрытием соединения с сокетом.
public close():void{
this.socket.unsubscribe();
this.ws.complete();
}
Функция для закрытия сокета.
public sendMessage( message:string ):void{
this.ws.next( message );
}
Эта функция открывает соединение с сокетом, записывая его объект в наблюдаемую переменную и подписывается на трансляцию из нее. При потере связи каждую секунду пытается восстановить связь.
ChannelWebsocketService
Наследуемый сервис для подписки на каналы Rails5 Action Cable:
import { Injectable } from "@angular/core";
import { Subject } from "rxjs/Subject";
import { WebSocketService } from "./websocket.service";
@Injectable()
export class ChannelWebsocketService {
private socketStarted: boolean;
public observableData: Subject = new Subject();
public identifier:Object = {};
public identifierStr: string;
public subscribed: Subject = new Subject();
constructor( private websocketService: WebSocketService ){
this.observeOpened();
this.observeMessage();
}
private static encodeIdentifier( identifier:string ):Object{
return JSON.parse( identifier );
}
private static getDataString( parameters:Object ):string{
let first = true,
result = '';
for ( let key in parameters ){
if( first ){
first = false;
result += `\"${ key }\":\"${ parameters[ key ] }\"`;
} else {
result += `, \"${ key }\":\"${ parameters[ key ] }\"`;
}
}
return `{ ${ result } }`;
}
private getSubscribeString():string{
this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );
return JSON.stringify( {
command: 'subscribe',
identifier: this.identifierStr
} );
};
private isThisChannel( data:Object ):boolean {
if( data[ 'identifier' ] ){
let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );
if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){
return true;
}
}
return false;
}
private observeMessage(){
let self = this;
this.websocketService.message.subscribe( ( data: Object ) => {
if( self.isThisChannel( data ) ){
if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){
this.subscribed.next( true );
} else if ( data[ 'message' ] ){
this.observableData.next( data[ 'message' ] );
}
}
} );
}
private observeOpened(){
let self = this;
this.websocketService.opened.subscribe( ( data: boolean ) => {
self.socketStarted = data;
if( data ){
self.subscribe();
}
} );
}
private subscribe(){
this.websocketService.sendMessage( this.getSubscribeString() );
}
public send( data: Object ){
this.websocketService.sendMessage( JSON.stringify( {
command:'message',
identifier: this.identifierStr,
data: ChannelWebsocketService.getDataString( data )
} ) );
}
public unsubscribe(){
this.websocketService.sendMessage( JSON.stringify( {
command: 'unsubscribe',
identifier: this.identifierStr } ) );
this.subscribed.next( false );
}
}
В данном сервисе есть 2 приватные и 3 публичные переменные, а также 7 приватных и 2 публичные функции.
socketStarted — переменная в которую транслируется состояние подписки на сокет.
identifierStr — специально подготовленная строка идентификатор для Rails5 Action Cable канала.
public observableData: Subject = new Subject();
public identifier:Object = {};
public subscribed: Subject = new Subject();
observableData — наблюдаемая переменная в которую записывается сообщение из сокета для канала.
identifier — объект идентификатор для Rails5 Action Cable канала.
subscribed — наблюдаемая переменная в которую записывается состояние подписки.
В конструкторе этого сервиса мы вызываем 2 функции: observeMessage и observeOpened, которые отслеживают присланные сокетом данные и состояние сокета соответственно.
private static encodeIdentifier( identifier:string ):Object{
return JSON.parse( identifier );
}
private static getDataString( parameters:Object ):string{
let first = true,
result = '';
for ( let key in parameters ){
if( first ){
first = false;
result += `\"${ key }\":\"${ parameters[ key ] }\"`;
} else {
result += `, \"${ key }\":\"${ parameters[ key ] }\"`;
}
}
return `{ ${ result } }`;
}
encodeIdentifier — статическая приватная функция декодирует строку идентификатора, которую вернул сокет для идентификации сообщения на принадлежность к каналу.
getDataString — преобразовывает объект в формат строки, который принимает Rails5 Action Cable.
Cервис, унаследованный от ChannelWebsocketService для подписки на канал ChatChannel:
import { Injectable } from "@angular/core";
import { ChannelWebsocketService } from "./channel.websocket.service";
import { WebSocketService } from "./websocket.service";
@Injectable()
export class ChatChannelService extends ChannelWebsocketService {
constructor( websocketService: WebSocketService ){
super( websocketService );
this.identifier = {
channel: 'ChatChannel'
};
}
}
В конструкторе этого сервиса переопределяется переменная identifier для идентификации канала.
ChatComponent
Компонент который используя ChatChannelService принимает/отправляет данные в канал.
Пример кода не привожу, он есть в GitHub, ссылка на который приведена ниже
Пример
Здесь можно скачать пример.
Для старта клиентского приложения переходим в папку «client» и вызываем:
npm install
gulp
Надеюсь эта статья поможет разобраться в данном вопросе.